4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/ofd/ofd_objects.c
33 * This file contains OSD API methods related to OBD Filter Device (OFD)
36 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37 * Author: Mikhail Pershin <mike.pershin@intel.com>
40 #define DEBUG_SUBSYSTEM S_FILTER
42 #include <dt_object.h>
43 #include <lustre_lfsck.h>
44 #include <lustre_export.h>
46 #include "ofd_internal.h"
49 * Get object version from disk and check it.
51 * This function checks object version from disk with
52 * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53 * VBR (Version-Based Recovery) and ensures that object has the same version
54 * upon replay as it has during original modification.
56 * \param[in] info execution thread OFD private data
57 * \param[in] fo OFD object
59 * \retval 0 if version matches
60 * \retval -EOVERFLOW on version mismatch
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63 struct ofd_object *fo)
65 dt_obj_version_t curr_version;
67 if (info->fti_exp == NULL)
70 curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
71 if ((__s64)curr_version == -EOPNOTSUPP)
73 /* VBR: version is checked always because costs nothing */
74 if (info->fti_pre_version != 0 &&
75 info->fti_pre_version != curr_version) {
76 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
77 info->fti_pre_version, curr_version);
78 spin_lock(&info->fti_exp->exp_lock);
79 info->fti_exp->exp_vbr_failed = 1;
80 spin_unlock(&info->fti_exp->exp_lock);
83 info->fti_pre_version = curr_version;
88 * Get OFD object by FID.
90 * This function finds OFD slice of compound object with the given FID.
92 * \param[in] env execution environment
93 * \param[in] ofd OFD device
94 * \param[in] fid FID of the object
96 * \retval pointer to the found ofd_object
97 * \retval ERR_PTR(errno) in case of error
99 struct ofd_object *ofd_object_find(const struct lu_env *env,
100 struct ofd_device *ofd,
101 const struct lu_fid *fid)
103 struct ofd_object *fo;
108 o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
109 if (likely(!IS_ERR(o)))
112 fo = ERR_CAST(o); /* return error */
118 * Get FID of parent MDT object.
120 * This function reads extended attribute XATTR_NAME_FID of OFD object which
121 * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
123 * The filter_fid::ff_parent::f_ver field currently holds
124 * the OST-object index in the parent MDT-object's layout EA,
125 * not the actual FID::f_ver of the parent. We therefore access
126 * it via the macro f_stripe_idx.
128 * \param[in] env execution environment
129 * \param[in] fo OFD object
131 * \retval 0 if successful
132 * \retval -ENODATA if there is no such xattr
133 * \retval negative value on error
135 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
137 struct ofd_thread_info *info = ofd_info(env);
138 struct filter_fid *ff = &fo->ofo_ff;
139 struct lu_buf *buf = &info->fti_buf;
142 if (fid_is_sane(&ff->ff_parent))
146 buf->lb_len = sizeof(*ff);
147 rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
149 struct filter_fid *ff_new;
151 OBD_ALLOC(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
154 buf->lb_buf = ff_new;
155 buf->lb_len = sizeof(*ff) + FILTER_FID_EXTRA_SIZE;
156 rc = dt_xattr_get(env, ofd_object_child(fo), buf,
159 memcpy(ff, ff_new, sizeof(*ff));
160 OBD_FREE(ff_new, sizeof(*ff) + FILTER_FID_EXTRA_SIZE);
165 if (unlikely(rc < sizeof(struct lu_fid))) {
166 fid_zero(&ff->ff_parent);
170 filter_fid_le_to_cpu(ff, ff, sizeof(*ff));
175 struct ofd_precreate_cb {
176 struct dt_txn_commit_cb opc_cb;
177 struct ofd_seq *opc_oseq;
181 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
182 struct dt_txn_commit_cb *cb, int err)
184 struct ofd_precreate_cb *opc;
185 struct ofd_seq *oseq;
187 opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
188 oseq = opc->opc_oseq;
190 CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
191 opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
192 PFID(&oseq->os_oi.oi_fid), th->th_sync);
193 atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
194 ofd_seq_put(env, opc->opc_oseq);
198 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
199 struct ofd_seq *oseq, int objects)
201 struct ofd_precreate_cb *opc;
202 struct dt_txn_commit_cb *dcb;
209 precreate = atomic_read(&oseq->os_precreate_in_progress);
210 refcount_inc(&oseq->os_refc);
211 opc->opc_oseq = oseq;
212 opc->opc_objects = objects;
213 CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
214 opc->opc_objects, precreate,
215 PFID(&oseq->os_oi.oi_fid), th->th_sync);
217 if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
221 dcb->dcb_func = ofd_cb_precreate;
222 INIT_LIST_HEAD(&dcb->dcb_linkage);
223 strscpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
225 rc = dt_trans_cb_add(th, dcb);
227 ofd_seq_put(env, oseq);
232 atomic_add(objects, &oseq->os_precreate_in_progress);
238 * Precreate the given number \a nr of objects in the given sequence \a oseq.
240 * This function precreates new OST objects in the given sequence.
241 * The precreation starts from \a id and creates \a nr objects sequentially.
244 * This function may create fewer objects than requested.
246 * We mark object SUID+SGID to flag it for accepting UID+GID from client on
247 * first write. Currently the permission bits on the OST are never used,
250 * Initialize a/c/m time so any client timestamp will always be newer and
251 * update the inode. The ctime = 0 case is also handled specially in
252 * osd_inode_setattr(). See LU-221, LU-1042 for details.
254 * \param[in] env execution environment
255 * \param[in] ofd OFD device
256 * \param[in] id object ID to start precreation from
257 * \param[in] oseq object sequence
258 * \param[in] nr number of objects to precreate
259 * \param[in] sync synchronous precreation flag
260 * \param[in] trans_local start local transaction
262 * \retval 0 if successful
263 * \retval negative value on error
265 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
266 u64 id, struct ofd_seq *oseq, int nr, int sync,
269 struct ofd_thread_info *info = ofd_info(env);
270 struct ofd_object *fo = NULL;
271 struct dt_object *next;
273 struct ofd_object **batch;
274 struct lu_fid *fid = &info->fti_fid;
284 /* Don't create objects beyond the valid range for this SEQ
285 * Last object to create is (id + nr - 1), but we move -1 on LHS
286 * to +1 on RHS to evaluate constant at compile time. LU-11186
288 if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
289 id + nr > IDIF_MAX_OID + 1)) {
290 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
291 ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
292 RETURN(rc = -ENOSPC);
293 } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
294 id + nr > OBIF_MAX_OID + 1)) {
295 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
296 ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
297 RETURN(rc = -ENOSPC);
300 OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
304 info->fti_attr.la_valid = LA_TYPE | LA_MODE;
305 info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
306 info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
308 info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
309 info->fti_attr.la_atime = 0;
310 info->fti_attr.la_mtime = 0;
311 info->fti_attr.la_ctime = 0;
315 /* prepare objects */
316 *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
317 for (i = 0; i < nr; i++) {
318 rc = fid_set_id(fid, id + i);
327 fo = ofd_object_find(env, ofd, fid);
330 GOTO(out, rc = PTR_ERR(fo));
338 info->fti_buf.lb_buf = &tmp;
339 info->fti_buf.lb_len = sizeof(tmp);
342 th = ofd_trans_create(env, ofd);
344 GOTO(out, rc = PTR_ERR(th));
348 rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
351 GOTO(trans_stop, rc);
353 for (i = 0; i < nr; i++) {
357 if (unlikely(ofd_object_exists(fo))) {
358 /* object may exist being re-created by write replay */
359 CDEBUG(D_INODE, "object %#llx/%#llx exists: "
360 DFID"\n", ostid_seq(&oseq->os_oi), id,
361 PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
365 next = ofd_object_child(fo);
366 LASSERT(next != NULL);
368 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
372 GOTO(trans_stop, rc);
379 /* Only needed for MDS+OSS rolling upgrade interop with 2.16+older. */
380 if (unlikely(trans_local))
381 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
383 rc = dt_trans_start(env, ofd->ofd_osd, th);
385 GOTO(trans_stop, rc);
387 CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
388 ofd_name(ofd), PFID(fid), nr);
390 /* When the LFSCK scanning the whole device to verify the LAST_ID file
391 * consistency, it will load the last_id into RAM firstly, and compare
392 * the last_id with each OST-object's ID. If the later one is larger,
393 * then it will regard the LAST_ID file crashed. But during the LFSCK
394 * scanning, the OFD may continue to create new OST-objects. Those new
395 * created OST-objects will have larger IDs than the LFSCK known ones.
396 * So from the LFSCK view, it needs to re-load the last_id from disk
397 * file, and if the latest last_id is still smaller than the object's
398 * ID, then the LAST_ID file is real crashed.
400 * To make above mechanism to work, before OFD pre-create OST-objects,
401 * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
402 * may cannot get latest last_id although new OST-object created. */
403 if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
404 tmp = cpu_to_le64(id + nr - 1);
405 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
406 rc = dt_record_write(env, oseq->os_lastid_obj,
407 &info->fti_buf, &info->fti_off, th);
408 dt_write_unlock(env, oseq->os_lastid_obj);
410 GOTO(trans_stop, rc);
413 for (i = 0; i < nr; i++) {
417 ofd_write_lock(env, fo);
419 /* Only the new created objects need to be recorded. */
420 if (ofd->ofd_osd->dd_record_fid_accessed) {
421 struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
423 lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
424 LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
425 lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
428 if (likely(!ofd_object_exists(fo) &&
429 !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
430 next = ofd_object_child(fo);
431 LASSERT(next != NULL);
433 rc = dt_create(env, next, &info->fti_attr, NULL,
435 ofd_write_unlock(env, fo);
438 GOTO(trans_stop, rc);
443 LASSERT(ofd_object_exists(fo));
445 ofd_write_unlock(env, fo);
448 ofd_seq_last_oid_set(oseq, id + i);
452 /* NOT all the wanted objects have been created,
453 * set the LAST_ID as the real created. */
454 if (unlikely(objects < nr)) {
458 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
459 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
460 rc1 = dt_record_write(env, oseq->os_lastid_obj,
461 &info->fti_buf, &info->fti_off, th);
462 dt_write_unlock(env, oseq->os_lastid_obj);
464 CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
465 ") from %llu to %llu\n", ofd_name(ofd),
466 ostid_seq(&oseq->os_oi), id + nr - 1,
467 ofd_seq_last_oid(oseq));
471 ofd_precreate_cb_add(env, th, oseq, objects);
473 rc2 = ofd_trans_stop(env, ofd, th, rc);
475 CERROR("%s: failed to stop transaction: rc = %d\n",
480 for (i = 0; i < nr_saved; i++) {
484 ofd_object_put(env, fo);
486 OBD_FREE_PTR_ARRAY(batch, nr_saved);
488 CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
489 "created %d/%d objects: %d\n", objects, nr_saved, rc);
491 LASSERT(ergo(objects == 0, rc < 0));
492 RETURN(objects > 0 ? objects : rc);
496 * Fix the OFD object ownership.
498 * If the object still has SUID+SGID bits set, meaning that it was precreated
499 * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
500 * then we will accept the UID/GID/PROJID if sent by the client for initializing
501 * the ownership of this object. We only allow this to happen once (so clear
502 * these bits) and later only allow setattr.
504 * \param[in] env execution environment
505 * \param[in] fo OFD object
506 * \param[in] la object attributes
507 * \param[in] is_setattr was this function called from setattr or not
509 * \retval 0 if successful
510 * \retval negative value on error
512 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
513 struct lu_attr *la, int is_setattr)
515 struct ofd_thread_info *info = ofd_info(env);
516 struct lu_attr *ln = &info->fti_attr2;
522 if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
523 !(la->la_valid & LA_PROJID))
526 rc = dt_attr_get(env, ofd_object_child(fo), ln);
530 LASSERT(ln->la_valid & LA_MODE);
533 * Only allow setattr to change UID/GID/PROJID, if
534 * SUID+SGID is not set which means this is not
535 * initialization of this objects.
538 if (!(ln->la_mode & S_ISUID))
539 la->la_valid &= ~LA_UID;
540 if (!(ln->la_mode & S_ISGID))
541 la->la_valid &= ~LA_GID;
542 if (!(ln->la_mode & S_ISVTX))
543 la->la_valid &= ~LA_PROJID;
546 /* Initialize ownership of this object, clear SUID+SGID bits*/
547 if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
549 if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
551 if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
554 if (!(la->la_valid & LA_MODE) || !is_setattr) {
555 la->la_mode = ln->la_mode;
556 la->la_valid |= LA_MODE;
558 la->la_mode &= ~mask;
565 * Check if it needs to update filter_fid by the value of @oa.
568 * \param[in] fo ofd object
569 * \param[in] oa obdo from client or MDT
570 * \param[out] ff if filter_fid needs updating, this field is used to
571 * return the new buffer
573 * \retval < 0 error occurred
574 * \retval 0 doesn't need to update filter_fid
575 * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update
577 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
578 const struct obdo *oa, struct filter_fid *ff)
584 (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
587 rc = ofd_object_ff_load(env, fo);
588 if (rc < 0 && rc != -ENODATA)
591 LASSERT(ff != &fo->ofo_ff);
592 if (rc == -ENODATA) {
593 rc = LU_XATTR_CREATE;
594 memset(ff, 0, sizeof(*ff));
596 rc = LU_XATTR_REPLACE;
597 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
600 if (oa->o_valid & OBD_MD_FLFID) {
601 /* packing fid and converting it to LE for storing into EA.
602 * Here ->o_stripe_idx should be filled by LOV and rest of
603 * fields - by client. */
604 ff->ff_parent.f_seq = oa->o_parent_seq;
605 ff->ff_parent.f_oid = oa->o_parent_oid;
606 /* XXX: we are ignoring o_parent_ver here, since this should
607 * be the same for all objects in this fileset. */
608 ff->ff_parent.f_ver = oa->o_stripe_idx;
610 if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
611 ff->ff_layout = oa->o_layout;
613 if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
614 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
615 PFID(&fo->ofo_ff.ff_parent),
616 PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
617 ff->ff_layout_version, oa->o_layout_version);
620 * resync write from client on non-primary objects and
621 * resync start from MDS on primary objects will contain
622 * LU_LAYOUT_RESYNC flag in the @oa.
624 * The layout version checking for write/punch from client
625 * happens in ofd_verify_layout_version() before coming to
626 * here, so that resync with smaller layout version client
627 * will be rejected there, the biggest resync version will
628 * be recorded in the OFD objects.
630 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
631 /* this opens a new era of writing */
632 ff->ff_layout_version = 0;
636 /* it's not allowed to change it to a smaller value */
637 if (ofd_layout_version_less(oa->o_layout_version,
638 ff->ff_layout_version))
641 if (ff->ff_layout_version == 0 ||
642 oa->o_layout_version & LU_LAYOUT_RESYNC) {
643 /* if LU_LAYOUT_RESYNC is set, it closes the era of
644 * writing. Only mirror I/O can write this object. */
645 ff->ff_layout_version = oa->o_layout_version;
647 } else if (oa->o_layout_version > ff->ff_layout_version) {
648 ff->ff_range = max_t(__u32, ff->ff_range,
649 oa->o_layout_version -
650 ff->ff_layout_version);
654 if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
655 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
663 * Set OFD object attributes.
665 * This function sets OFD object attributes taken from incoming request.
666 * It sets not only regular attributes but also XATTR_NAME_FID extended
667 * attribute if needed. The "fid" xattr allows the object's MDT parent inode
668 * to be found and verified by LFSCK and other tools in case of inconsistency.
670 * \param[in] env execution environment
671 * \param[in] fo OFD object
672 * \param[in] la object attributes
673 * \param[in] oa obdo carries fid, ost_layout, layout version
675 * \retval 0 if successful
676 * \retval negative value on error
678 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
679 struct lu_attr *la, struct obdo *oa)
681 struct ofd_thread_info *info = ofd_info(env);
682 struct ofd_device *ofd = ofd_obj2dev(fo);
683 struct filter_fid *ff = &info->fti_mds_fid;
689 if (!ofd_object_exists(fo))
690 GOTO(out, rc = -ENOENT);
692 /* VBR: version recovery check */
693 rc = ofd_version_get_check(info, fo);
697 rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
701 th = ofd_trans_create(env, ofd);
703 GOTO(out, rc = PTR_ERR(th));
705 rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
709 info->fti_buf.lb_buf = ff;
710 info->fti_buf.lb_len = sizeof(*ff);
711 rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
712 XATTR_NAME_FID, 0, th);
716 rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
720 ofd_write_lock(env, fo);
721 if (!ofd_object_exists(fo))
722 GOTO(unlock, rc = -ENOENT);
724 /* serialize vs ofd_commitrw_write() */
725 if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
726 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
729 rc = dt_attr_set(env, ofd_object_child(fo), la, th);
733 fl = ofd_object_ff_update(env, fo, oa, ff);
735 GOTO(unlock, rc = fl);
738 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
739 ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
740 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
741 le32_add_cpu(&ff->ff_parent.f_oid, -1);
742 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
745 info->fti_buf.lb_buf = ff;
746 info->fti_buf.lb_len = sizeof(*ff);
747 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
748 XATTR_NAME_FID, fl, th);
750 filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
756 ofd_write_unlock(env, fo);
758 rc2 = ofd_trans_stop(env, ofd, th, rc);
760 CERROR("%s: failed to stop transaction: rc = %d\n",
769 * Fallocate(Preallocate) space for OFD object.
771 * This function allocates space for the object from the \a start
772 * offset to the \a end offset.
774 * \param[in] env execution environment
775 * \param[in] fo OFD object
776 * \param[in] start start offset to allocate from
777 * \param[in] end end of allocate
778 * \param[in] mode fallocate mode
779 * \param[in] la object attributes
780 * \param[in] ff filter_fid structure
782 * \retval 0 if successful
783 * \retval negative value on error
785 int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
786 __u64 start, __u64 end, int mode, struct lu_attr *la,
789 struct ofd_thread_info *info = ofd_info(env);
790 struct ofd_device *ofd = ofd_obj2dev(fo);
791 struct dt_object *dob = ofd_object_child(fo);
793 struct filter_fid *ff = &info->fti_mds_fid;
794 bool ff_needed = false;
799 if (!ofd_object_exists(fo))
802 /* VBR: version recovery check */
803 rc = ofd_version_get_check(info, fo);
808 rc = ofd_object_ff_load(env, fo);
815 if (oa->o_valid & OBD_MD_FLFID) {
816 ff->ff_parent.f_seq = oa->o_parent_seq;
817 ff->ff_parent.f_oid = oa->o_parent_oid;
818 ff->ff_parent.f_ver = oa->o_stripe_idx;
820 if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
821 ff->ff_layout = oa->o_layout;
822 if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
823 ff->ff_layout_version = oa->o_layout_version;
824 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
828 th = ofd_trans_create(env, ofd);
832 rc = dt_declare_attr_set(env, dob, la, th);
836 rc = dt_declare_fallocate(env, dob, start, end, mode, th);
841 info->fti_buf.lb_buf = ff;
842 info->fti_buf.lb_len = sizeof(*ff);
843 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
844 &info->fti_buf, XATTR_NAME_FID, 0,
850 rc = ofd_trans_start(env, ofd, fo, th);
854 ofd_read_lock(env, fo);
855 if (!ofd_object_exists(fo))
856 GOTO(unlock, rc = -ENOENT);
858 if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
859 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
862 rc = dt_falloc(env, dob, start, end, mode, th);
866 rc = dt_attr_set(env, dob, la, th);
871 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
872 XATTR_NAME_FID, 0, th);
874 filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
877 ofd_read_unlock(env, fo);
879 ofd_trans_stop(env, ofd, th, rc);
884 * Truncate/punch OFD object.
886 * This function frees all of the allocated object's space from the \a start
887 * offset to the \a end offset. For truncate() operations the \a end offset
888 * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
889 * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
891 * \param[in] env execution environment
892 * \param[in] fo OFD object
893 * \param[in] start start offset to punch from
894 * \param[in] end end of punch
895 * \param[in] la object attributes
896 * \param[in] oa obdo struct from incoming request
898 * \retval 0 if successful
899 * \retval negative value on error
901 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
902 __u64 start, __u64 end, struct lu_attr *la,
905 struct ofd_thread_info *info = ofd_info(env);
906 struct ofd_device *ofd = ofd_obj2dev(fo);
907 struct dt_object *dob = ofd_object_child(fo);
908 struct filter_fid *ff = &info->fti_mds_fid;
914 /* we support truncate, not punch yet */
915 LASSERT(end == OBD_OBJECT_EOF);
917 if (!ofd_object_exists(fo))
918 GOTO(out, rc = -ENOENT);
920 if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
921 rc = ofd_verify_ff(env, fo, oa);
926 /* VBR: version recovery check */
927 rc = ofd_version_get_check(info, fo);
931 rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
935 th = ofd_trans_create(env, ofd);
937 GOTO(out, rc = PTR_ERR(th));
939 if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
940 /* punch must be aware we are dealing with an encrypted file */
941 la->la_valid |= LA_FLAGS;
942 la->la_flags |= LUSTRE_ENCRYPT_FL;
944 rc = dt_declare_attr_set(env, dob, la, th);
948 rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
952 info->fti_buf.lb_buf = ff;
953 info->fti_buf.lb_len = sizeof(*ff);
954 rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
955 XATTR_NAME_FID, 0, th);
959 rc = ofd_trans_start(env, ofd, fo, th);
963 ofd_write_lock(env, fo);
965 if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
966 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
969 if (!ofd_object_exists(fo))
970 GOTO(unlock, rc = -ENOENT);
972 /* need to verify layout version */
973 if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
974 rc = ofd_verify_layout_version(env, fo, oa);
979 rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
983 fl = ofd_object_ff_update(env, fo, oa, ff);
985 GOTO(unlock, rc = fl);
987 rc = dt_attr_set(env, dob, la, th);
992 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
993 ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
994 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
995 le32_add_cpu(&ff->ff_parent.f_oid, -1);
996 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
999 info->fti_buf.lb_buf = ff;
1000 info->fti_buf.lb_len = sizeof(*ff);
1001 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
1002 XATTR_NAME_FID, fl, th);
1004 filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
1010 ofd_write_unlock(env, fo);
1012 rc2 = ofd_trans_stop(env, ofd, th, rc);
1014 CERROR("%s: failed to stop transaction: rc = %d\n",
1015 ofd_name(ofd), rc2);
1023 * Destroy OFD object.
1025 * This function destroys OFD object. If object wasn't used at all (orphan)
1026 * then local transaction is used, which means the transaction data is not
1027 * returned back in reply.
1029 * \param[in] env execution environment
1030 * \param[in] fo OFD object
1031 * \param[in] orphan flag to indicate that object is orphaned
1033 * \retval 0 if successful
1034 * \retval negative value on error
1036 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
1039 struct ofd_device *ofd = ofd_obj2dev(fo);
1046 if (!ofd_object_exists(fo))
1047 GOTO(out, rc = -ENOENT);
1049 th = ofd_trans_create(env, ofd);
1051 GOTO(out, rc = PTR_ERR(th));
1053 rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
1057 rc = dt_declare_destroy(env, ofd_object_child(fo), th);
1062 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1064 rc = ofd_trans_start(env, ofd, NULL, th);
1068 ofd_write_lock(env, fo);
1069 if (!ofd_object_exists(fo))
1070 GOTO(unlock, rc = -ENOENT);
1072 tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
1074 dt_ref_del(env, ofd_object_child(fo), th);
1075 dt_destroy(env, ofd_object_child(fo), th);
1077 ofd_write_unlock(env, fo);
1079 rc2 = ofd_trans_stop(env, ofd, th, rc);
1081 CERROR("%s failed to stop transaction: %d\n",
1082 ofd_name(ofd), rc2);
1090 * Get OFD object attributes.
1092 * This function gets OFD object regular attributes. It is used to serve
1093 * incoming request as well as for local OFD purposes.
1095 * \param[in] env execution environment
1096 * \param[in] fo OFD object
1097 * \param[in] la object attributes
1099 * \retval 0 if successful
1100 * \retval negative value on error
1102 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
1109 if (ofd_object_exists(fo)) {
1110 rc = dt_attr_get(env, ofd_object_child(fo), la);