Whamcloud - gitweb
LU-11186 ofd: fix for a final oid at sequence
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_objects.c
33  *
34  * This file contains OSD API methods related to OBD Filter Device (OFD)
35  * object operations.
36  *
37  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
38  * Author: Mikhail Pershin <mike.pershin@intel.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_FILTER
42
43 #include <dt_object.h>
44 #include <lustre_lfsck.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         LASSERT(ofd_object_exists(fo));
68
69         if (info->fti_exp == NULL)
70                 RETURN(0);
71
72         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
73         if ((__s64)curr_version == -EOPNOTSUPP)
74                 RETURN(0);
75         /* VBR: version is checked always because costs nothing */
76         if (info->fti_pre_version != 0 &&
77             info->fti_pre_version != curr_version) {
78                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
79                        info->fti_pre_version, curr_version);
80                 spin_lock(&info->fti_exp->exp_lock);
81                 info->fti_exp->exp_vbr_failed = 1;
82                 spin_unlock(&info->fti_exp->exp_lock);
83                 RETURN (-EOVERFLOW);
84         }
85         info->fti_pre_version = curr_version;
86         RETURN(0);
87 }
88
89 /**
90  * Get OFD object by FID.
91  *
92  * This function finds OFD slice of compound object with the given FID.
93  *
94  * \param[in] env       execution environment
95  * \param[in] ofd       OFD device
96  * \param[in] fid       FID of the object
97  *
98  * \retval              pointer to the found ofd_object
99  * \retval              ERR_PTR(errno) in case of error
100  */
101 struct ofd_object *ofd_object_find(const struct lu_env *env,
102                                    struct ofd_device *ofd,
103                                    const struct lu_fid *fid)
104 {
105         struct ofd_object *fo;
106         struct lu_object  *o;
107
108         ENTRY;
109
110         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
111         if (likely(!IS_ERR(o)))
112                 fo = ofd_obj(o);
113         else
114                 fo = ERR_CAST(o); /* return error */
115
116         RETURN(fo);
117 }
118
119 /**
120  * Get FID of parent MDT object.
121  *
122  * This function reads extended attribute XATTR_NAME_FID of OFD object which
123  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
124  *
125  * The filter_fid::ff_parent::f_ver field currently holds
126  * the OST-object index in the parent MDT-object's layout EA,
127  * not the actual FID::f_ver of the parent. We therefore access
128  * it via the macro f_stripe_idx.
129  *
130  * \param[in] env       execution environment
131  * \param[in] fo        OFD object
132  *
133  * \retval              0 if successful
134  * \retval              -ENODATA if there is no such xattr
135  * \retval              negative value on error
136  */
137 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
138 {
139         struct ofd_thread_info *info = ofd_info(env);
140         struct filter_fid *ff = &fo->ofo_ff;
141         struct lu_buf *buf = &info->fti_buf;
142         int rc = 0;
143
144         if (fid_is_sane(&ff->ff_parent))
145                 return 0;
146
147         buf->lb_buf = ff;
148         buf->lb_len = sizeof(*ff);
149         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
150         if (rc < 0)
151                 return rc;
152
153         if (unlikely(rc < sizeof(struct lu_fid))) {
154                 fid_zero(&ff->ff_parent);
155                 return -EINVAL;
156         }
157
158         filter_fid_le_to_cpu(ff, ff, rc);
159
160         return 0;
161 }
162
163 /**
164  * Precreate the given number \a nr of objects in the given sequence \a oseq.
165  *
166  * This function precreates new OST objects in the given sequence.
167  * The precreation starts from \a id and creates \a nr objects sequentially.
168  *
169  * Notes:
170  * This function may create fewer objects than requested.
171  *
172  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
173  * first write. Currently the permission bits on the OST are never used,
174  * so this is OK.
175  *
176  * Initialize a/c/m time so any client timestamp will always be newer and
177  * update the inode. The ctime = 0 case is also handled specially in
178  * osd_inode_setattr(). See LU-221, LU-1042 for details.
179  *
180  * \param[in] env       execution environment
181  * \param[in] ofd       OFD device
182  * \param[in] id        object ID to start precreation from
183  * \param[in] oseq      object sequence
184  * \param[in] nr        number of objects to precreate
185  * \param[in] sync      synchronous precreation flag
186  *
187  * \retval              0 if successful
188  * \retval              negative value on error
189  */
190 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
191                           u64 id, struct ofd_seq *oseq, int nr, int sync)
192 {
193         struct ofd_thread_info  *info = ofd_info(env);
194         struct ofd_object       *fo = NULL;
195         struct dt_object        *next;
196         struct thandle          *th;
197         struct ofd_object       **batch;
198         struct lu_fid           *fid = &info->fti_fid;
199         u64                     tmp;
200         int                     rc;
201         int                     rc2;
202         int                     i;
203         int                     objects = 0;
204         int                     nr_saved = nr;
205
206         ENTRY;
207
208         /* Don't create objects beyond the valid range for this SEQ */
209         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
210                      (id + nr) > IDIF_MAX_OID)) {
211                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
212                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
213                 RETURN(rc = -ENOSPC);
214         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
215                             (id + nr) > OBIF_MAX_OID)) {
216                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
217                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
218                 RETURN(rc = -ENOSPC);
219         }
220
221         OBD_ALLOC(batch, nr_saved * sizeof(struct ofd_object *));
222         if (batch == NULL)
223                 RETURN(-ENOMEM);
224
225         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
226         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
227         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
228
229         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
230         info->fti_attr.la_atime = 0;
231         info->fti_attr.la_mtime = 0;
232         info->fti_attr.la_ctime = 0;
233
234         LASSERT(id != 0);
235
236         /* prepare objects */
237         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
238         for (i = 0; i < nr; i++) {
239                 rc = fid_set_id(fid, id + i);
240                 if (rc != 0) {
241                         if (i == 0)
242                                 GOTO(out, rc);
243
244                         nr = i;
245                         break;
246                 }
247
248                 fo = ofd_object_find(env, ofd, fid);
249                 if (IS_ERR(fo)) {
250                         if (i == 0)
251                                 GOTO(out, rc = PTR_ERR(fo));
252
253                         nr = i;
254                         break;
255                 }
256
257                 ofd_write_lock(env, fo);
258                 batch[i] = fo;
259         }
260         info->fti_buf.lb_buf = &tmp;
261         info->fti_buf.lb_len = sizeof(tmp);
262         info->fti_off = 0;
263
264         th = ofd_trans_create(env, ofd);
265         if (IS_ERR(th))
266                 GOTO(out, rc = PTR_ERR(th));
267
268         th->th_sync |= sync;
269
270         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
271                                      info->fti_off, th);
272         if (rc)
273                 GOTO(trans_stop, rc);
274
275         for (i = 0; i < nr; i++) {
276                 fo = batch[i];
277                 LASSERT(fo);
278
279                 if (unlikely(ofd_object_exists(fo))) {
280                         /* object may exist being re-created by write replay */
281                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
282                                DFID"\n", ostid_seq(&oseq->os_oi), id,
283                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
284                         continue;
285                 }
286
287                 next = ofd_object_child(fo);
288                 LASSERT(next != NULL);
289
290                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
291                                        &info->fti_dof, th);
292                 if (rc < 0) {
293                         if (i == 0)
294                                 GOTO(trans_stop, rc);
295
296                         nr = i;
297                         break;
298                 }
299         }
300
301         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
302         if (rc)
303                 GOTO(trans_stop, rc);
304
305         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
306                ofd_name(ofd), PFID(fid), nr);
307
308          /* When the LFSCK scanning the whole device to verify the LAST_ID file
309           * consistency, it will load the last_id into RAM firstly, and compare
310           * the last_id with each OST-object's ID. If the later one is larger,
311           * then it will regard the LAST_ID file crashed. But during the LFSCK
312           * scanning, the OFD may continue to create new OST-objects. Those new
313           * created OST-objects will have larger IDs than the LFSCK known ones.
314           * So from the LFSCK view, it needs to re-load the last_id from disk
315           * file, and if the latest last_id is still smaller than the object's
316           * ID, then the LAST_ID file is real crashed.
317           *
318           * To make above mechanism to work, before OFD pre-create OST-objects,
319           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
320           * may cannot get latest last_id although new OST-object created. */
321         if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
322                 tmp = cpu_to_le64(id + nr - 1);
323                 dt_write_lock(env, oseq->os_lastid_obj, 0);
324                 rc = dt_record_write(env, oseq->os_lastid_obj,
325                                      &info->fti_buf, &info->fti_off, th);
326                 dt_write_unlock(env, oseq->os_lastid_obj);
327                 if (rc != 0)
328                         GOTO(trans_stop, rc);
329         }
330
331         for (i = 0; i < nr; i++) {
332                 fo = batch[i];
333                 LASSERT(fo);
334
335                 /* Only the new created objects need to be recorded. */
336                 if (ofd->ofd_osd->dd_record_fid_accessed) {
337                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
338
339                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
340                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
341                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
342                 }
343
344                 if (likely(!ofd_object_exists(fo) &&
345                            !OBD_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
346                         next = ofd_object_child(fo);
347                         LASSERT(next != NULL);
348
349                         rc = dt_create(env, next, &info->fti_attr, NULL,
350                                        &info->fti_dof, th);
351                         if (rc < 0) {
352                                 if (i == 0)
353                                         GOTO(trans_stop, rc);
354
355                                 rc = 0;
356                                 break;
357                         }
358                         LASSERT(ofd_object_exists(fo));
359                 }
360                 ofd_seq_last_oid_set(oseq, id + i);
361         }
362
363         objects = i;
364         /* NOT all the wanted objects have been created,
365          * set the LAST_ID as the real created. */
366         if (unlikely(objects < nr)) {
367                 int rc1;
368
369                 info->fti_off = 0;
370                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
371                 dt_write_lock(env, oseq->os_lastid_obj, 0);
372                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
373                                       &info->fti_buf, &info->fti_off, th);
374                 dt_write_unlock(env, oseq->os_lastid_obj);
375                 if (rc1 != 0)
376                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
377                                ") from %llu to %llu\n", ofd_name(ofd),
378                                ostid_seq(&oseq->os_oi), id + nr - 1,
379                                ofd_seq_last_oid(oseq));
380         }
381
382 trans_stop:
383         rc2 = ofd_trans_stop(env, ofd, th, rc);
384         if (rc2)
385                 CERROR("%s: failed to stop transaction: rc = %d\n",
386                        ofd_name(ofd), rc2);
387         if (!rc)
388                 rc = rc2;
389 out:
390         for (i = 0; i < nr_saved; i++) {
391                 fo = batch[i];
392                 if (fo) {
393                         ofd_write_unlock(env, fo);
394                         ofd_object_put(env, fo);
395                 }
396         }
397         OBD_FREE(batch, nr_saved * sizeof(struct ofd_object *));
398
399         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
400                "created %d/%d objects: %d\n", objects, nr_saved, rc);
401
402         LASSERT(ergo(objects == 0, rc < 0));
403         RETURN(objects > 0 ? objects : rc);
404 }
405
406 /**
407  * Fix the OFD object ownership.
408  *
409  * If the object still has SUID+SGID bits set, meaning that it was precreated
410  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
411  * then we will accept the UID/GID/PROJID if sent by the client for initializing
412  * the ownership of this object.  We only allow this to happen once (so clear
413  * these bits) and later only allow setattr.
414  *
415  * \param[in] env        execution environment
416  * \param[in] fo         OFD object
417  * \param[in] la         object attributes
418  * \param[in] is_setattr was this function called from setattr or not
419  *
420  * \retval              0 if successful
421  * \retval              negative value on error
422  */
423 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
424                          struct lu_attr *la, int is_setattr)
425 {
426         struct ofd_thread_info  *info = ofd_info(env);
427         struct lu_attr          *ln = &info->fti_attr2;
428         __u32                    mask = 0;
429         int                      rc;
430
431         ENTRY;
432
433         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
434             !(la->la_valid & LA_PROJID))
435                 RETURN(0);
436
437         rc = dt_attr_get(env, ofd_object_child(fo), ln);
438         if (rc != 0)
439                 RETURN(rc);
440
441         LASSERT(ln->la_valid & LA_MODE);
442
443         /*
444          * Only allow setattr to change UID/GID/PROJID, if
445          * SUID+SGID is not set which means this is not
446          * initialization of this objects.
447          */
448         if (!is_setattr) {
449                 if (!(ln->la_mode & S_ISUID))
450                         la->la_valid &= ~LA_UID;
451                 if (!(ln->la_mode & S_ISGID))
452                         la->la_valid &= ~LA_GID;
453                 if (!(ln->la_mode & S_ISVTX))
454                         la->la_valid &= ~LA_PROJID;
455         }
456
457         /* Initialize ownership of this object, clear SUID+SGID bits*/
458         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
459                 mask |= S_ISUID;
460         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
461                 mask |= S_ISGID;
462         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
463                 mask |= S_ISVTX;
464         if (mask != 0) {
465                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
466                         la->la_mode = ln->la_mode;
467                         la->la_valid |= LA_MODE;
468                 }
469                 la->la_mode &= ~mask;
470         }
471
472         RETURN(0);
473 }
474
475 /**
476  * Check if it needs to update filter_fid by the value of @oa.
477  *
478  * \param[in] env       env
479  * \param[in] fo        ofd object
480  * \param[in] oa        obdo from client or MDT
481  * \param[out] ff       if filter_fid needs updating, this field is used to
482  *                      return the new buffer
483  *
484  * \retval < 0          error occurred
485  * \retval 0            doesn't need to update filter_fid
486  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
487  */
488 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
489                          const struct obdo *oa, struct filter_fid *ff)
490 {
491         int rc = 0;
492         ENTRY;
493
494         if (!(oa->o_valid &
495               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
496                 RETURN(0);
497
498         rc = ofd_object_ff_load(env, fo);
499         if (rc < 0 && rc != -ENODATA)
500                 RETURN(rc);
501
502         LASSERT(ff != &fo->ofo_ff);
503         if (rc == -ENODATA) {
504                 rc = LU_XATTR_CREATE;
505                 memset(ff, 0, sizeof(*ff));
506         } else {
507                 rc = LU_XATTR_REPLACE;
508                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
509         }
510
511         if (oa->o_valid & OBD_MD_FLFID) {
512                 /* packing fid and converting it to LE for storing into EA.
513                  * Here ->o_stripe_idx should be filled by LOV and rest of
514                  * fields - by client. */
515                 ff->ff_parent.f_seq = oa->o_parent_seq;
516                 ff->ff_parent.f_oid = oa->o_parent_oid;
517                 /* XXX: we are ignoring o_parent_ver here, since this should
518                  *      be the same for all objects in this fileset. */
519                 ff->ff_parent.f_ver = oa->o_stripe_idx;
520         }
521         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
522                 ff->ff_layout = oa->o_layout;
523
524         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
525                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
526                        PFID(&fo->ofo_ff.ff_parent),
527                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
528                        ff->ff_layout_version, oa->o_layout_version);
529
530                 /* only the MDS has the authority to update layout version */
531                 if (!(exp_connect_flags(ofd_info(env)->fti_exp) &
532                       OBD_CONNECT_MDS)) {
533                         CERROR(DFID": update layout version from client\n",
534                                PFID(&fo->ofo_ff.ff_parent));
535
536                         RETURN(-EPERM);
537                 }
538
539                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
540                         /* this opens a new era of writing */
541                         ff->ff_layout_version = 0;
542                         ff->ff_range = 0;
543                 }
544
545                 /* it's not allowed to change it to a smaller value */
546                 if (oa->o_layout_version < ff->ff_layout_version)
547                         RETURN(-EINVAL);
548
549                 if (ff->ff_layout_version == 0 ||
550                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
551                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
552                          * writing. Only mirror I/O can write this object. */
553                         ff->ff_layout_version = oa->o_layout_version;
554                         ff->ff_range = 0;
555                 } else if (oa->o_layout_version > ff->ff_layout_version) {
556                         ff->ff_range = MAX(ff->ff_range,
557                                   oa->o_layout_version - ff->ff_layout_version);
558                 }
559         }
560
561         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
562                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
563         else /* no change */
564                 rc = 0;
565
566         RETURN(rc);
567 }
568
569 /**
570  * Set OFD object attributes.
571  *
572  * This function sets OFD object attributes taken from incoming request.
573  * It sets not only regular attributes but also XATTR_NAME_FID extended
574  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
575  * to be found and verified by LFSCK and other tools in case of inconsistency.
576  *
577  * \param[in] env       execution environment
578  * \param[in] fo        OFD object
579  * \param[in] la        object attributes
580  * \param[in] oa        obdo carries fid, ost_layout, layout version
581  *
582  * \retval              0 if successful
583  * \retval              negative value on error
584  */
585 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
586                  struct lu_attr *la, struct obdo *oa)
587 {
588         struct ofd_thread_info  *info = ofd_info(env);
589         struct ofd_device       *ofd = ofd_obj2dev(fo);
590         struct filter_fid       *ff = &info->fti_mds_fid;
591         struct thandle          *th;
592         struct ofd_mod_data     *fmd;
593         int                     fl;
594         int                     rc;
595         int                     rc2;
596         ENTRY;
597
598         ofd_write_lock(env, fo);
599         if (!ofd_object_exists(fo))
600                 GOTO(unlock, rc = -ENOENT);
601
602         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
603                 fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
604                 if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
605                         fmd->fmd_mactime_xid = info->fti_xid;
606                 ofd_fmd_put(info->fti_exp, fmd);
607         }
608
609         /* VBR: version recovery check */
610         rc = ofd_version_get_check(info, fo);
611         if (rc)
612                 GOTO(unlock, rc);
613
614         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
615         if (rc != 0)
616                 GOTO(unlock, rc);
617
618         fl = ofd_object_ff_update(env, fo, oa, ff);
619         if (fl < 0)
620                 GOTO(unlock, rc = fl);
621
622         th = ofd_trans_create(env, ofd);
623         if (IS_ERR(th))
624                 GOTO(unlock, rc = PTR_ERR(th));
625
626         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
627         if (rc)
628                 GOTO(stop, rc);
629
630         if (fl) {
631                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
632                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
633                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
634                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
635
636                 info->fti_buf.lb_buf = ff;
637                 info->fti_buf.lb_len = sizeof(*ff);
638                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
639                                           &info->fti_buf, XATTR_NAME_FID, fl,
640                                           th);
641                 if (rc)
642                         GOTO(stop, rc);
643         }
644
645         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
646         if (rc)
647                 GOTO(stop, rc);
648
649         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
650         if (rc)
651                 GOTO(stop, rc);
652
653         if (fl) {
654                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
655                         GOTO(stop, rc);
656
657                 info->fti_buf.lb_buf = ff;
658                 info->fti_buf.lb_len = sizeof(*ff);
659                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
660                                   XATTR_NAME_FID, fl, th);
661                 if (!rc)
662                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
663         }
664
665         GOTO(stop, rc);
666
667 stop:
668         rc2 = ofd_trans_stop(env, ofd, th, rc);
669         if (rc2)
670                 CERROR("%s: failed to stop transaction: rc = %d\n",
671                        ofd_name(ofd), rc2);
672         if (!rc)
673                 rc = rc2;
674
675 unlock:
676         ofd_write_unlock(env, fo);
677
678         return rc;
679 }
680
681 /**
682  * Truncate/punch OFD object.
683  *
684  * This function frees all of the allocated object's space from the \a start
685  * offset to the \a end offset. For truncate() operations the \a end offset
686  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
687  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
688  *
689  * \param[in] env       execution environment
690  * \param[in] fo        OFD object
691  * \param[in] start     start offset to punch from
692  * \param[in] end       end of punch
693  * \param[in] la        object attributes
694  * \param[in] oa        obdo struct from incoming request
695  *
696  * \retval              0 if successful
697  * \retval              negative value on error
698  */
699 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
700                      __u64 start, __u64 end, struct lu_attr *la,
701                      struct obdo *oa)
702 {
703         struct ofd_thread_info  *info = ofd_info(env);
704         struct ofd_device       *ofd = ofd_obj2dev(fo);
705         struct ofd_mod_data     *fmd;
706         struct dt_object        *dob = ofd_object_child(fo);
707         struct filter_fid       *ff = &info->fti_mds_fid;
708         struct thandle          *th;
709         int                     fl;
710         int                     rc;
711         int                     rc2;
712
713         ENTRY;
714
715         /* we support truncate, not punch yet */
716         LASSERT(end == OBD_OBJECT_EOF);
717
718         ofd_write_lock(env, fo);
719         fmd = ofd_fmd_get(info->fti_exp, &fo->ofo_header.loh_fid);
720         if (fmd && fmd->fmd_mactime_xid < info->fti_xid)
721                 fmd->fmd_mactime_xid = info->fti_xid;
722         ofd_fmd_put(info->fti_exp, fmd);
723
724         if (!ofd_object_exists(fo))
725                 GOTO(unlock, rc = -ENOENT);
726
727         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
728                 rc = ofd_verify_ff(env, fo, oa);
729                 if (rc != 0)
730                         GOTO(unlock, rc);
731         }
732
733         /* need to verify layout version */
734         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
735                 rc = ofd_verify_layout_version(env, fo, oa);
736                 if (rc)
737                         GOTO(unlock, rc);
738
739                 oa->o_valid &= ~OBD_MD_LAYOUT_VERSION;
740         }
741
742         /* VBR: version recovery check */
743         rc = ofd_version_get_check(info, fo);
744         if (rc)
745                 GOTO(unlock, rc);
746
747         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
748         if (rc != 0)
749                 GOTO(unlock, rc);
750
751         fl = ofd_object_ff_update(env, fo, oa, ff);
752         if (fl < 0)
753                 GOTO(unlock, rc = fl);
754
755         th = ofd_trans_create(env, ofd);
756         if (IS_ERR(th))
757                 GOTO(unlock, rc = PTR_ERR(th));
758
759         rc = dt_declare_attr_set(env, dob, la, th);
760         if (rc)
761                 GOTO(stop, rc);
762
763         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
764         if (rc)
765                 GOTO(stop, rc);
766
767         if (fl) {
768                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
769                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
770                 else if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
771                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
772
773                 info->fti_buf.lb_buf = ff;
774                 info->fti_buf.lb_len = sizeof(*ff);
775                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
776                                           &info->fti_buf, XATTR_NAME_FID, fl,
777                                           th);
778                 if (rc)
779                         GOTO(stop, rc);
780         }
781
782         rc = ofd_trans_start(env, ofd, fo, th);
783         if (rc)
784                 GOTO(stop, rc);
785
786         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
787         if (rc)
788                 GOTO(stop, rc);
789
790         rc = dt_attr_set(env, dob, la, th);
791         if (rc)
792                 GOTO(stop, rc);
793
794         if (fl) {
795                 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
796                         GOTO(stop, rc);
797
798                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
799                                   XATTR_NAME_FID, fl, th);
800                 if (!rc)
801                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
802         }
803
804         GOTO(stop, rc);
805
806 stop:
807         rc2 = ofd_trans_stop(env, ofd, th, rc);
808         if (rc2 != 0)
809                 CERROR("%s: failed to stop transaction: rc = %d\n",
810                        ofd_name(ofd), rc2);
811         if (!rc)
812                 rc = rc2;
813 unlock:
814         ofd_write_unlock(env, fo);
815
816         return rc;
817 }
818
819 /**
820  * Destroy OFD object.
821  *
822  * This function destroys OFD object. If object wasn't used at all (orphan)
823  * then local transaction is used, which means the transaction data is not
824  * returned back in reply.
825  *
826  * \param[in] env       execution environment
827  * \param[in] fo        OFD object
828  * \param[in] orphan    flag to indicate that object is orphaned
829  *
830  * \retval              0 if successful
831  * \retval              negative value on error
832  */
833 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
834                        int orphan)
835 {
836         struct ofd_device       *ofd = ofd_obj2dev(fo);
837         struct thandle          *th;
838         int                     rc = 0;
839         int                     rc2;
840
841         ENTRY;
842
843         ofd_write_lock(env, fo);
844         if (!ofd_object_exists(fo))
845                 GOTO(unlock, rc = -ENOENT);
846
847         th = ofd_trans_create(env, ofd);
848         if (IS_ERR(th))
849                 GOTO(unlock, rc = PTR_ERR(th));
850
851         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
852         if (rc < 0)
853                 GOTO(stop, rc);
854
855         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
856         if (rc < 0)
857                 GOTO(stop, rc);
858
859         if (orphan)
860                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
861         else
862                 rc = ofd_trans_start(env, ofd, NULL, th);
863         if (rc)
864                 GOTO(stop, rc);
865
866         ofd_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
867
868         dt_ref_del(env, ofd_object_child(fo), th);
869         dt_destroy(env, ofd_object_child(fo), th);
870 stop:
871         rc2 = ofd_trans_stop(env, ofd, th, rc);
872         if (rc2)
873                 CERROR("%s failed to stop transaction: %d\n",
874                        ofd_name(ofd), rc2);
875         if (!rc)
876                 rc = rc2;
877 unlock:
878         ofd_write_unlock(env, fo);
879         RETURN(rc);
880 }
881
882 /**
883  * Get OFD object attributes.
884  *
885  * This function gets OFD object regular attributes. It is used to serve
886  * incoming request as well as for local OFD purposes.
887  *
888  * \param[in] env       execution environment
889  * \param[in] fo        OFD object
890  * \param[in] la        object attributes
891  *
892  * \retval              0 if successful
893  * \retval              negative value on error
894  */
895 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
896                  struct lu_attr *la)
897 {
898         int rc = 0;
899
900         ENTRY;
901
902         if (ofd_object_exists(fo)) {
903                 rc = dt_attr_get(env, ofd_object_child(fo), la);
904         } else {
905                 rc = -ENOENT;
906         }
907         RETURN(rc);
908 }