4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/ofd/ofd_objects.c
34 * This file contains OSD API methods related to OBD Filter Device (OFD)
37 * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38 * Author: Mikhail Pershin <mike.pershin@intel.com>
41 #define DEBUG_SUBSYSTEM S_FILTER
43 #include <dt_object.h>
44 #include <lustre_lfsck.h>
46 #include "ofd_internal.h"
49 * Get object version from disk and check it.
51 * This function checks object version from disk with
52 * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53 * VBR (Version-Based Recovery) and ensures that object has the same version
54 * upon replay as it has during original modification.
56 * \param[in] info execution thread OFD private data
57 * \param[in] fo OFD object
59 * \retval 0 if version matches
60 * \retval -EOVERFLOW on version mismatch
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63 struct ofd_object *fo)
65 dt_obj_version_t curr_version;
67 LASSERT(ofd_object_exists(fo));
69 if (info->fti_exp == NULL)
72 curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
73 if ((__s64)curr_version == -EOPNOTSUPP)
75 /* VBR: version is checked always because costs nothing */
76 if (info->fti_pre_version != 0 &&
77 info->fti_pre_version != curr_version) {
78 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
79 info->fti_pre_version, curr_version);
80 spin_lock(&info->fti_exp->exp_lock);
81 info->fti_exp->exp_vbr_failed = 1;
82 spin_unlock(&info->fti_exp->exp_lock);
85 info->fti_pre_version = curr_version;
90 * Get OFD object by FID.
92 * This function finds OFD slice of compound object with the given FID.
94 * \param[in] env execution environment
95 * \param[in] ofd OFD device
96 * \param[in] fid FID of the object
98 * \retval pointer to the found ofd_object
99 * \retval ERR_PTR(errno) in case of error
101 struct ofd_object *ofd_object_find(const struct lu_env *env,
102 struct ofd_device *ofd,
103 const struct lu_fid *fid)
105 struct ofd_object *fo;
110 o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
111 if (likely(!IS_ERR(o)))
114 fo = ERR_CAST(o); /* return error */
120 * Get FID of parent MDT object.
122 * This function reads extended attribute XATTR_NAME_FID of OFD object which
123 * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
125 * The filter_fid::ff_parent::f_ver field currently holds
126 * the OST-object index in the parent MDT-object's layout EA,
127 * not the actual FID::f_ver of the parent. We therefore access
128 * it via the macro f_stripe_idx.
130 * \param[in] env execution environment
131 * \param[in] fo OFD object
133 * \retval 0 if successful
134 * \retval -ENODATA if there is no such xattr
135 * \retval negative value on error
137 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
139 struct ofd_thread_info *info = ofd_info(env);
140 struct filter_fid *ff = &fo->ofo_ff;
141 struct lu_buf *buf = &info->fti_buf;
144 if (fid_is_sane(&ff->ff_parent))
148 buf->lb_len = sizeof(*ff);
149 rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
153 if (unlikely(rc < sizeof(struct lu_fid))) {
154 fid_zero(&ff->ff_parent);
158 filter_fid_le_to_cpu(ff, ff, rc);
164 * Precreate the given number \a nr of objects in the given sequence \a oseq.
166 * This function precreates new OST objects in the given sequence.
167 * The precreation starts from \a id and creates \a nr objects sequentially.
170 * This function may create fewer objects than requested.
172 * We mark object SUID+SGID to flag it for accepting UID+GID from client on
173 * first write. Currently the permission bits on the OST are never used,
176 * Initialize a/c/m time so any client timestamp will always be newer and
177 * update the inode. The ctime = 0 case is also handled specially in
178 * osd_inode_setattr(). See LU-221, LU-1042 for details.
180 * \param[in] env execution environment
181 * \param[in] ofd OFD device
182 * \param[in] id object ID to start precreation from
183 * \param[in] oseq object sequence
184 * \param[in] nr number of objects to precreate
185 * \param[in] sync synchronous precreation flag
187 * \retval 0 if successful
188 * \retval negative value on error
190 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
191 u64 id, struct ofd_seq *oseq, int nr, int sync)
193 struct ofd_thread_info *info = ofd_info(env);
194 struct ofd_object *fo = NULL;
195 struct dt_object *next;
197 struct ofd_object **batch;
198 struct lu_fid *fid = &info->fti_fid;
208 /* Don't create objects beyond the valid range for this SEQ */
209 if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
210 (id + nr) > IDIF_MAX_OID)) {
211 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
212 ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
213 RETURN(rc = -ENOSPC);
214 } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
215 (id + nr) > OBIF_MAX_OID)) {
216 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
217 ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
218 RETURN(rc = -ENOSPC);
221 OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
225 info->fti_attr.la_valid = LA_TYPE | LA_MODE;
226 info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
227 info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
229 info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
230 info->fti_attr.la_atime = 0;
231 info->fti_attr.la_mtime = 0;
232 info->fti_attr.la_ctime = 0;
236 /* prepare objects */
237 *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
238 for (i = 0; i < nr; i++) {
239 rc = fid_set_id(fid, id + i);
248 fo = ofd_object_find(env, ofd, fid);
251 GOTO(out, rc = PTR_ERR(fo));
257 ofd_write_lock(env, fo);
260 info->fti_buf.lb_buf = &tmp;
261 info->fti_buf.lb_len = sizeof(tmp);
264 th = ofd_trans_create(env, ofd);
266 GOTO(out, rc = PTR_ERR(th));
270 rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
273 GOTO(trans_stop, rc);
275 for (i = 0; i < nr; i++) {
279 if (unlikely(ofd_object_exists(fo))) {
280 /* object may exist being re-created by write replay */
281 CDEBUG(D_INODE, "object %#llx/%#llx exists: "
282 DFID"\n", ostid_seq(&oseq->os_oi), id,
283 PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
287 next = ofd_object_child(fo);
288 LASSERT(next != NULL);
290 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
294 GOTO(trans_stop, rc);
301 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
303 GOTO(trans_stop, rc);
305 CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
306 ofd_name(ofd), PFID(fid), nr);
308 /* When the LFSCK scanning the whole device to verify the LAST_ID file
309 * consistency, it will load the last_id into RAM firstly, and compare
310 * the last_id with each OST-object's ID. If the later one is larger,
311 * then it will regard the LAST_ID file crashed. But during the LFSCK
312 * scanning, the OFD may continue to create new OST-objects. Those new
313 * created OST-objects will have larger IDs than the LFSCK known ones.
314 * So from the LFSCK view, it needs to re-load the last_id from disk
315 * file, and if the latest last_id is still smaller than the object's
316 * ID, then the LAST_ID file is real crashed.
318 * To make above mechanism to work, before OFD pre-create OST-objects,
319 * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
320 * may cannot get latest last_id although new OST-object created. */
321 if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
322 tmp = cpu_to_le64(id + nr - 1);
323 dt_write_lock(env, oseq->os_lastid_obj, 0);
324 rc = dt_record_write(env, oseq->os_lastid_obj,
325 &info->fti_buf, &info->fti_off, th);
326 dt_write_unlock(env, oseq->os_lastid_obj);
328 GOTO(trans_stop, rc);
331 for (i = 0; i < nr; i++) {
335 /* Only the new created objects need to be recorded. */
336 if (ofd->ofd_osd->dd_record_fid_accessed) {
337 struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
339 lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
340 LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
341 lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
344 if (likely(!ofd_object_exists(fo) &&
345 !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
346 next = ofd_object_child(fo);
347 LASSERT(next != NULL);
349 rc = dt_create(env, next, &info->fti_attr, NULL,
353 GOTO(trans_stop, rc);
358 LASSERT(ofd_object_exists(fo));
360 ofd_seq_last_oid_set(oseq, id + i);
364 /* NOT all the wanted objects have been created,
365 * set the LAST_ID as the real created. */
366 if (unlikely(objects < nr)) {
370 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
371 dt_write_lock(env, oseq->os_lastid_obj, 0);
372 rc1 = dt_record_write(env, oseq->os_lastid_obj,
373 &info->fti_buf, &info->fti_off, th);
374 dt_write_unlock(env, oseq->os_lastid_obj);
376 CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
377 ") from %llu to %llu\n", ofd_name(ofd),
378 ostid_seq(&oseq->os_oi), id + nr - 1,
379 ofd_seq_last_oid(oseq));
383 rc2 = ofd_trans_stop(env, ofd, th, rc);
385 CERROR("%s: failed to stop transaction: rc = %d\n",
390 for (i = 0; i < nr_saved; i++) {
393 ofd_write_unlock(env, fo);
394 ofd_object_put(env, fo);
397 OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
399 CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
400 "created %d/%d objects: %d\n", objects, nr_saved, rc);
402 LASSERT(ergo(objects == 0, rc < 0));
403 RETURN(objects > 0 ? objects : rc);
407 * Fix the OFD object ownership.
409 * If the object still has SUID+SGID bits set, meaning that it was precreated
410 * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
411 * then we will accept the UID/GID/PROJID if sent by the client for initializing
412 * the ownership of this object. We only allow this to happen once (so clear
413 * these bits) and later only allow setattr.
415 * \param[in] env execution environment
416 * \param[in] fo OFD object
417 * \param[in] la object attributes
418 * \param[in] is_setattr was this function called from setattr or not
420 * \retval 0 if successful
421 * \retval negative value on error
423 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
424 struct lu_attr *la, int is_setattr)
426 struct ofd_thread_info *info = ofd_info(env);
427 struct lu_attr *ln = &info->fti_attr2;
433 if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
434 !(la->la_valid & LA_PROJID))
437 rc = dt_attr_get(env, ofd_object_child(fo), ln);
441 LASSERT(ln->la_valid & LA_MODE);
444 * Only allow setattr to change UID/GID/PROJID, if
445 * SUID+SGID is not set which means this is not
446 * initialization of this objects.
449 if (!(ln->la_mode & S_ISUID))
450 la->la_valid &= ~LA_UID;
451 if (!(ln->la_mode & S_ISGID))
452 la->la_valid &= ~LA_GID;
453 if (!(ln->la_mode & S_ISVTX))
454 la->la_valid &= ~LA_PROJID;
457 /* Initialize ownership of this object, clear SUID+SGID bits*/
458 if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
460 if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
462 if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
465 if (!(la->la_valid & LA_MODE) || !is_setattr) {
466 la->la_mode = ln->la_mode;
467 la->la_valid |= LA_MODE;
469 la->la_mode &= ~mask;
476 * Check if it needs to update filter_fid by the value of @oa.
479 * \param[in] fo ofd object
480 * \param[in] oa obdo from client or MDT
481 * \param[out] ff if filter_fid needs updating, this field is used to
482 * return the new buffer
484 * \retval < 0 error occurred
485 * \retval 0 doesn't need to update filter_fid
486 * \retval FL_XATTR_{CREATE,REPLACE} flag for xattr update
488 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
489 const struct obdo *oa, struct filter_fid *ff)
495 (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
498 rc = ofd_object_ff_load(env, fo);
499 if (rc < 0 && rc != -ENODATA)
502 LASSERT(ff != &fo->ofo_ff);
503 if (rc == -ENODATA) {
504 rc = LU_XATTR_CREATE;
505 memset(ff, 0, sizeof(*ff));
507 rc = LU_XATTR_REPLACE;
508 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
511 if (oa->o_valid & OBD_MD_FLFID) {
512 /* packing fid and converting it to LE for storing into EA.
513 * Here ->o_stripe_idx should be filled by LOV and rest of
514 * fields - by client. */
515 ff->ff_parent.f_seq = oa->o_parent_seq;
516 ff->ff_parent.f_oid = oa->o_parent_oid;
517 /* XXX: we are ignoring o_parent_ver here, since this should
518 * be the same for all objects in this fileset. */
519 ff->ff_parent.f_ver = oa->o_stripe_idx;
521 if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
522 ff->ff_layout = oa->o_layout;
524 if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
525 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
526 PFID(&fo->ofo_ff.ff_parent),
527 PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
528 ff->ff_layout_version, oa->o_layout_version);
530 /* only the MDS has the authority to update layout version */
531 if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
533 CERROR(DFID": update layout version from client\n",
534 PFID(&fo->ofo_ff.ff_parent));
539 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
540 /* this opens a new era of writing */
541 ff->ff_layout_version = 0;
545 /* it's not allowed to change it to a smaller value */
546 if (oa->o_layout_version < ff->ff_layout_version)
549 if (ff->ff_layout_version == 0 ||
550 oa->o_layout_version & LU_LAYOUT_RESYNC) {
551 /* if LU_LAYOUT_RESYNC is set, it closes the era of
552 * writing. Only mirror I/O can write this object. */
553 ff->ff_layout_version = oa->o_layout_version;
555 } else if (oa->o_layout_version > ff->ff_layout_version) {
556 ff->ff_range = MAX(ff->ff_range,
557 oa->o_layout_version - ff->ff_layout_version);
561 if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
562 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
570 * Set OFD object attributes.
572 * This function sets OFD object attributes taken from incoming request.
573 * It sets not only regular attributes but also XATTR_NAME_FID extended
574 * attribute if needed. The "fid" xattr allows the object's MDT parent inode
575 * to be found and verified by LFSCK and other tools in case of inconsistency.
577 * \param[in] env execution environment
578 * \param[in] fo OFD object
579 * \param[in] la object attributes
580 * \param[in] oa obdo carries fid, ost_layout, layout version
582 * \retval 0 if successful
583 * \retval negative value on error
585 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
586 struct lu_attr *la, struct obdo *oa)
588 struct ofd_thread_info *info = ofd_info(env);
589 struct ofd_device *ofd = ofd_obj2dev(fo);
590 struct filter_fid *ff = &info->fti_mds_fid;
592 struct ofd_mod_data *fmd;
598 ofd_write_lock(env, fo);
599 if (!ofd_object_exists(fo))
600 GOTO(unlock, rc = -ENOENT);
602 if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
603 fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
604 if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
605 fmd->fmd_mactime_xid = info->fti_xid;
606 ofd_fmd_put(info->fti_exp, fmd);
609 /* VBR: version recovery check */
610 rc = ofd_version_get_check(info, fo);
614 rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
618 fl = ofd_object_ff_update(env, fo, oa, ff);
620 GOTO(unlock, rc = fl);
622 th = ofd_trans_create(env, ofd);
624 GOTO(unlock, rc = PTR_ERR(th));
626 rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
631 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
632 ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
633 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
634 le32_add_cpu(&ff->ff_parent.f_oid, -1);
636 info->fti_buf.lb_buf = ff;
637 info->fti_buf.lb_len = sizeof(*ff);
638 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
639 &info->fti_buf, XATTR_NAME_FID, fl,
645 rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
649 rc = dt_attr_set(env, ofd_object_child(fo), la, th);
654 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
657 info->fti_buf.lb_buf = ff;
658 info->fti_buf.lb_len = sizeof(*ff);
659 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
660 XATTR_NAME_FID, fl, th);
662 filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
668 rc2 = ofd_trans_stop(env, ofd, th, rc);
670 CERROR("%s: failed to stop transaction: rc = %d\n",
676 ofd_write_unlock(env, fo);
682 * Truncate/punch OFD object.
684 * This function frees all of the allocated object's space from the \a start
685 * offset to the \a end offset. For truncate() operations the \a end offset
686 * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
687 * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
689 * \param[in] env execution environment
690 * \param[in] fo OFD object
691 * \param[in] start start offset to punch from
692 * \param[in] end end of punch
693 * \param[in] la object attributes
694 * \param[in] oa obdo struct from incoming request
696 * \retval 0 if successful
697 * \retval negative value on error
699 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
700 __u64 start, __u64 end, struct lu_attr *la,
703 struct ofd_thread_info *info = ofd_info(env);
704 struct ofd_device *ofd = ofd_obj2dev(fo);
705 struct ofd_mod_data *fmd;
706 struct dt_object *dob = ofd_object_child(fo);
707 struct filter_fid *ff = &info->fti_mds_fid;
715 /* we support truncate, not punch yet */
716 LASSERT(end == OBD_OBJECT_EOF);
718 ofd_write_lock(env, fo);
719 fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
720 if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
721 fmd->fmd_mactime_xid = info->fti_xid;
722 ofd_fmd_put(info->fti_exp, fmd);
724 if (!ofd_object_exists(fo))
725 GOTO(unlock, rc = -ENOENT);
727 if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
728 rc = ofd_verify_ff(env, fo, oa);
733 /* need to verify layout version */
734 if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
735 rc = ofd_verify_layout_version(env, fo, oa);
739 oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
742 /* VBR: version recovery check */
743 rc = ofd_version_get_check(info, fo);
747 rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
751 fl = ofd_object_ff_update(env, fo, oa, ff);
753 GOTO(unlock, rc = fl);
755 th = ofd_trans_create(env, ofd);
757 GOTO(unlock, rc = PTR_ERR(th));
759 rc = dt_declare_attr_set(env, dob, la, th);
763 rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
768 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
769 ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
770 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
771 le32_add_cpu(&ff->ff_parent.f_oid, -1);
773 info->fti_buf.lb_buf = ff;
774 info->fti_buf.lb_len = sizeof(*ff);
775 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
776 &info->fti_buf, XATTR_NAME_FID, fl,
782 rc = ofd_trans_start(env, ofd, fo, th);
786 rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
790 rc = dt_attr_set(env, dob, la, th);
795 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
798 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
799 XATTR_NAME_FID, fl, th);
801 filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
807 rc2 = ofd_trans_stop(env, ofd, th, rc);
809 CERROR("%s: failed to stop transaction: rc = %d\n",
814 ofd_write_unlock(env, fo);
820 * Destroy OFD object.
822 * This function destroys OFD object. If object wasn't used at all (orphan)
823 * then local transaction is used, which means the transaction data is not
824 * returned back in reply.
826 * \param[in] env execution environment
827 * \param[in] fo OFD object
828 * \param[in] orphan flag to indicate that object is orphaned
830 * \retval 0 if successful
831 * \retval negative value on error
833 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
836 struct ofd_device *ofd = ofd_obj2dev(fo);
843 ofd_write_lock(env, fo);
844 if (!ofd_object_exists(fo))
845 GOTO(unlock, rc = -ENOENT);
847 th = ofd_trans_create(env, ofd);
849 GOTO(unlock, rc = PTR_ERR(th));
851 rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
855 rc = dt_declare_destroy(env, ofd_object_child(fo), th);
860 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
862 rc = ofd_trans_start(env, ofd, NULL, th);
866 ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
868 dt_ref_del(env, ofd_object_child(fo), th);
869 dt_destroy(env, ofd_object_child(fo), th);
871 rc2 = ofd_trans_stop(env, ofd, th, rc);
873 CERROR("%s failed to stop transaction: %d\n",
878 ofd_write_unlock(env, fo);
883 * Get OFD object attributes.
885 * This function gets OFD object regular attributes. It is used to serve
886 * incoming request as well as for local OFD purposes.
888 * \param[in] env execution environment
889 * \param[in] fo OFD object
890 * \param[in] la object attributes
892 * \retval 0 if successful
893 * \retval negative value on error
895 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
902 if (ofd_object_exists(fo)) {
903 rc = dt_attr_get(env, ofd_object_child(fo), la);