Whamcloud - gitweb
fe74fa1524f5f67426f1ece88825ccf61a3941c7
[fs/lustre-release.git] / lustre / ofd / ofd_objects.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ofd/ofd_objects.c
32  *
33  * This file contains OSD API methods related to OBD Filter Device (OFD)
34  * object operations.
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_FILTER
41
42 #include <dt_object.h>
43 #include <lustre_lfsck.h>
44 #include <lustre_export.h>
45
46 #include "ofd_internal.h"
47
48 /**
49  * Get object version from disk and check it.
50  *
51  * This function checks object version from disk with
52  * ofd_thread_info::fti_pre_version filled from incoming RPC. This is part of
53  * VBR (Version-Based Recovery) and ensures that object has the same version
54  * upon replay as it has during original modification.
55  *
56  * \param[in]  info     execution thread OFD private data
57  * \param[in]  fo       OFD object
58  *
59  * \retval              0 if version matches
60  * \retval              -EOVERFLOW on version mismatch
61  */
62 static int ofd_version_get_check(struct ofd_thread_info *info,
63                                  struct ofd_object *fo)
64 {
65         dt_obj_version_t curr_version;
66
67         if (info->fti_exp == NULL)
68                 RETURN(0);
69
70         curr_version = dt_version_get(info->fti_env, ofd_object_child(fo));
71         if ((__s64)curr_version == -EOPNOTSUPP)
72                 RETURN(0);
73         /* VBR: version is checked always because costs nothing */
74         if (info->fti_pre_version != 0 &&
75             info->fti_pre_version != curr_version) {
76                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
77                        info->fti_pre_version, curr_version);
78                 spin_lock(&info->fti_exp->exp_lock);
79                 info->fti_exp->exp_vbr_failed = 1;
80                 spin_unlock(&info->fti_exp->exp_lock);
81                 RETURN (-EOVERFLOW);
82         }
83         info->fti_pre_version = curr_version;
84         RETURN(0);
85 }
86
87 /**
88  * Get OFD object by FID.
89  *
90  * This function finds OFD slice of compound object with the given FID.
91  *
92  * \param[in] env       execution environment
93  * \param[in] ofd       OFD device
94  * \param[in] fid       FID of the object
95  *
96  * \retval              pointer to the found ofd_object
97  * \retval              ERR_PTR(errno) in case of error
98  */
99 struct ofd_object *ofd_object_find(const struct lu_env *env,
100                                    struct ofd_device *ofd,
101                                    const struct lu_fid *fid)
102 {
103         struct ofd_object *fo;
104         struct lu_object  *o;
105
106         ENTRY;
107
108         o = lu_object_find(env, &ofd->ofd_dt_dev.dd_lu_dev, fid, NULL);
109         if (likely(!IS_ERR(o)))
110                 fo = ofd_obj(o);
111         else
112                 fo = ERR_CAST(o); /* return error */
113
114         RETURN(fo);
115 }
116
117 /**
118  * Get FID of parent MDT object.
119  *
120  * This function reads extended attribute XATTR_NAME_FID of OFD object which
121  * contains the MDT parent object FID and saves it in ofd_object::ofo_ff.
122  *
123  * The filter_fid::ff_parent::f_ver field currently holds
124  * the OST-object index in the parent MDT-object's layout EA,
125  * not the actual FID::f_ver of the parent. We therefore access
126  * it via the macro f_stripe_idx.
127  *
128  * \param[in] env       execution environment
129  * \param[in] fo        OFD object
130  *
131  * \retval              0 if successful
132  * \retval              -ENODATA if there is no such xattr
133  * \retval              negative value on error
134  */
135 int ofd_object_ff_load(const struct lu_env *env, struct ofd_object *fo)
136 {
137         struct ofd_thread_info *info = ofd_info(env);
138         struct filter_fid *ff = &fo->ofo_ff;
139         struct lu_buf *buf = &info->fti_buf;
140         int rc = 0;
141
142         if (fid_is_sane(&ff->ff_parent))
143                 return 0;
144
145         buf->lb_buf = ff;
146         buf->lb_len = sizeof(*ff);
147         rc = dt_xattr_get(env, ofd_object_child(fo), buf, XATTR_NAME_FID);
148         if (rc < 0)
149                 return rc;
150
151         if (unlikely(rc < sizeof(struct lu_fid))) {
152                 fid_zero(&ff->ff_parent);
153                 return -EINVAL;
154         }
155
156         filter_fid_le_to_cpu(ff, ff, rc);
157
158         return 0;
159 }
160
161 struct ofd_precreate_cb {
162         struct dt_txn_commit_cb  opc_cb;
163         struct ofd_seq          *opc_oseq;
164         int                      opc_objects;
165 };
166
167 static void ofd_cb_precreate(struct lu_env *env, struct thandle *th,
168                              struct dt_txn_commit_cb *cb, int err)
169 {
170         struct ofd_precreate_cb *opc;
171         struct ofd_seq *oseq;
172
173         opc = container_of(cb, struct ofd_precreate_cb, opc_cb);
174         oseq = opc->opc_oseq;
175
176         CDEBUG(D_OTHER, "Sub %d from %d for "DFID", th_sync %d\n",
177                opc->opc_objects, atomic_read(&oseq->os_precreate_in_progress),
178                PFID(&oseq->os_oi.oi_fid), th->th_sync);
179         atomic_sub(opc->opc_objects, &oseq->os_precreate_in_progress);
180         ofd_seq_put(env, opc->opc_oseq);
181         OBD_FREE_PTR(opc);
182 }
183
184 static int ofd_precreate_cb_add(const struct lu_env *env, struct thandle *th,
185                                 struct ofd_seq *oseq, int objects)
186 {
187         struct ofd_precreate_cb *opc;
188         struct dt_txn_commit_cb *dcb;
189         int precreate, rc;
190
191         OBD_ALLOC_PTR(opc);
192         if (!opc)
193                 return -ENOMEM;
194
195         precreate = atomic_read(&oseq->os_precreate_in_progress);
196         refcount_inc(&oseq->os_refc);
197         opc->opc_oseq = oseq;
198         opc->opc_objects = objects;
199         CDEBUG(D_OTHER, "Add %d to %d for "DFID", th_sync %d\n",
200                opc->opc_objects, precreate,
201                PFID(&oseq->os_oi.oi_fid), th->th_sync);
202
203         if ((precreate + objects) >= (5 * OST_MAX_PRECREATE))
204                 th->th_sync = 1;
205
206         dcb = &opc->opc_cb;
207         dcb->dcb_func = ofd_cb_precreate;
208         INIT_LIST_HEAD(&dcb->dcb_linkage);
209         strscpy(dcb->dcb_name, "ofd_cb_precreate", sizeof(dcb->dcb_name));
210
211         rc = dt_trans_cb_add(th, dcb);
212         if (rc) {
213                 ofd_seq_put(env, oseq);
214                 OBD_FREE_PTR(opc);
215                 return rc;
216         }
217
218         atomic_add(objects, &oseq->os_precreate_in_progress);
219
220         return 0;
221 }
222
223 /**
224  * Precreate the given number \a nr of objects in the given sequence \a oseq.
225  *
226  * This function precreates new OST objects in the given sequence.
227  * The precreation starts from \a id and creates \a nr objects sequentially.
228  *
229  * Notes:
230  * This function may create fewer objects than requested.
231  *
232  * We mark object SUID+SGID to flag it for accepting UID+GID from client on
233  * first write. Currently the permission bits on the OST are never used,
234  * so this is OK.
235  *
236  * Initialize a/c/m time so any client timestamp will always be newer and
237  * update the inode. The ctime = 0 case is also handled specially in
238  * osd_inode_setattr(). See LU-221, LU-1042 for details.
239  *
240  * \param[in] env               execution environment
241  * \param[in] ofd               OFD device
242  * \param[in] id                object ID to start precreation from
243  * \param[in] oseq              object sequence
244  * \param[in] nr                number of objects to precreate
245  * \param[in] sync              synchronous precreation flag
246  * \param[in] trans_local       start local transaction
247  *
248  * \retval              0 if successful
249  * \retval              negative value on error
250  */
251 int ofd_precreate_objects(const struct lu_env *env, struct ofd_device *ofd,
252                           u64 id, struct ofd_seq *oseq, int nr, int sync,
253                           bool trans_local)
254 {
255         struct ofd_thread_info  *info = ofd_info(env);
256         struct ofd_object       *fo = NULL;
257         struct dt_object        *next;
258         struct thandle          *th;
259         struct ofd_object       **batch;
260         struct lu_fid           *fid = &info->fti_fid;
261         u64                     tmp;
262         int                     rc;
263         int                     rc2;
264         int                     i;
265         int                     objects = 0;
266         int                     nr_saved = nr;
267
268         ENTRY;
269
270         /* Don't create objects beyond the valid range for this SEQ
271          * Last object to create is (id + nr - 1), but we move -1 on LHS
272          * to +1 on RHS to evaluate constant at compile time. LU-11186
273          */
274         if (unlikely(fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
275                      id + nr > IDIF_MAX_OID + 1)) {
276                 CERROR("%s:"DOSTID" hit the IDIF_MAX_OID (1<<48)!\n",
277                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
278                 RETURN(rc = -ENOSPC);
279         } else if (unlikely(!fid_seq_is_mdt0(ostid_seq(&oseq->os_oi)) &&
280                             id + nr > OBIF_MAX_OID + 1)) {
281                 CERROR("%s:"DOSTID" hit the OBIF_MAX_OID (1<<32)!\n",
282                        ofd_name(ofd), id, ostid_seq(&oseq->os_oi));
283                 RETURN(rc = -ENOSPC);
284         }
285
286         OBD_ALLOC_PTR_ARRAY(batch, nr_saved);
287         if (batch == NULL)
288                 RETURN(-ENOMEM);
289
290         info->fti_attr.la_valid = LA_TYPE | LA_MODE;
291         info->fti_attr.la_mode = S_IFREG | S_ISUID | S_ISGID | S_ISVTX | 0666;
292         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
293
294         info->fti_attr.la_valid |= LA_ATIME | LA_MTIME | LA_CTIME;
295         info->fti_attr.la_atime = 0;
296         info->fti_attr.la_mtime = 0;
297         info->fti_attr.la_ctime = 0;
298
299         LASSERT(id != 0);
300
301         /* prepare objects */
302         *fid = *lu_object_fid(&oseq->os_lastid_obj->do_lu);
303         for (i = 0; i < nr; i++) {
304                 rc = fid_set_id(fid, id + i);
305                 if (rc != 0) {
306                         if (i == 0)
307                                 GOTO(out, rc);
308
309                         nr = i;
310                         break;
311                 }
312
313                 fo = ofd_object_find(env, ofd, fid);
314                 if (IS_ERR(fo)) {
315                         if (i == 0)
316                                 GOTO(out, rc = PTR_ERR(fo));
317
318                         nr = i;
319                         break;
320                 }
321
322                 batch[i] = fo;
323         }
324         info->fti_buf.lb_buf = &tmp;
325         info->fti_buf.lb_len = sizeof(tmp);
326         info->fti_off = 0;
327
328         th = ofd_trans_create(env, ofd);
329         if (IS_ERR(th))
330                 GOTO(out, rc = PTR_ERR(th));
331
332         th->th_sync |= sync;
333
334         rc = dt_declare_record_write(env, oseq->os_lastid_obj, &info->fti_buf,
335                                      info->fti_off, th);
336         if (rc)
337                 GOTO(trans_stop, rc);
338
339         for (i = 0; i < nr; i++) {
340                 fo = batch[i];
341                 LASSERT(fo);
342
343                 if (unlikely(ofd_object_exists(fo))) {
344                         /* object may exist being re-created by write replay */
345                         CDEBUG(D_INODE, "object %#llx/%#llx exists: "
346                                DFID"\n", ostid_seq(&oseq->os_oi), id,
347                                PFID(lu_object_fid(&fo->ofo_obj.do_lu)));
348                         continue;
349                 }
350
351                 next = ofd_object_child(fo);
352                 LASSERT(next != NULL);
353
354                 rc = dt_declare_create(env, next, &info->fti_attr, NULL,
355                                        &info->fti_dof, th);
356                 if (rc < 0) {
357                         if (i == 0)
358                                 GOTO(trans_stop, rc);
359
360                         nr = i;
361                         break;
362                 }
363         }
364
365         /* Only needed for MDS+OSS rolling upgrade interop with 2.16+older. */
366         if (unlikely(trans_local))
367                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
368         else
369                 rc = dt_trans_start(env, ofd->ofd_osd, th);
370         if (rc)
371                 GOTO(trans_stop, rc);
372
373         CDEBUG(D_OTHER, "%s: create new object "DFID" nr %d\n",
374                ofd_name(ofd), PFID(fid), nr);
375
376          /* When the LFSCK scanning the whole device to verify the LAST_ID file
377           * consistency, it will load the last_id into RAM firstly, and compare
378           * the last_id with each OST-object's ID. If the later one is larger,
379           * then it will regard the LAST_ID file crashed. But during the LFSCK
380           * scanning, the OFD may continue to create new OST-objects. Those new
381           * created OST-objects will have larger IDs than the LFSCK known ones.
382           * So from the LFSCK view, it needs to re-load the last_id from disk
383           * file, and if the latest last_id is still smaller than the object's
384           * ID, then the LAST_ID file is real crashed.
385           *
386           * To make above mechanism to work, before OFD pre-create OST-objects,
387           * it needs to update the LAST_ID file firstly, otherwise, the LFSCK
388           * may cannot get latest last_id although new OST-object created. */
389         if (!CFS_FAIL_CHECK(OBD_FAIL_LFSCK_SKIP_LASTID)) {
390                 tmp = cpu_to_le64(id + nr - 1);
391                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
392                 rc = dt_record_write(env, oseq->os_lastid_obj,
393                                      &info->fti_buf, &info->fti_off, th);
394                 dt_write_unlock(env, oseq->os_lastid_obj);
395                 if (rc != 0)
396                         GOTO(trans_stop, rc);
397         }
398
399         for (i = 0; i < nr; i++) {
400                 fo = batch[i];
401                 LASSERT(fo);
402
403                 ofd_write_lock(env, fo);
404
405                 /* Only the new created objects need to be recorded. */
406                 if (ofd->ofd_osd->dd_record_fid_accessed) {
407                         struct lfsck_req_local *lrl = &ofd_info(env)->fti_lrl;
408
409                         lfsck_pack_rfa(lrl, lu_object_fid(&fo->ofo_obj.do_lu),
410                                        LEL_FID_ACCESSED, LFSCK_TYPE_LAYOUT);
411                         lfsck_in_notify_local(env, ofd->ofd_osd, lrl, NULL);
412                 }
413
414                 if (likely(!ofd_object_exists(fo) &&
415                            !CFS_FAIL_CHECK(OBD_FAIL_LFSCK_DANGLING))) {
416                         next = ofd_object_child(fo);
417                         LASSERT(next != NULL);
418
419                         rc = dt_create(env, next, &info->fti_attr, NULL,
420                                        &info->fti_dof, th);
421                         ofd_write_unlock(env, fo);
422                         if (rc < 0) {
423                                 if (i == 0)
424                                         GOTO(trans_stop, rc);
425
426                                 rc = 0;
427                                 break;
428                         }
429                         LASSERT(ofd_object_exists(fo));
430                 } else {
431                         ofd_write_unlock(env, fo);
432                 }
433
434                 ofd_seq_last_oid_set(oseq, id + i);
435         }
436
437         objects = i;
438         /* NOT all the wanted objects have been created,
439          * set the LAST_ID as the real created. */
440         if (unlikely(objects < nr)) {
441                 int rc1;
442
443                 info->fti_off = 0;
444                 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
445                 dt_write_lock(env, oseq->os_lastid_obj, DT_LASTID);
446                 rc1 = dt_record_write(env, oseq->os_lastid_obj,
447                                       &info->fti_buf, &info->fti_off, th);
448                 dt_write_unlock(env, oseq->os_lastid_obj);
449                 if (rc1 != 0)
450                         CERROR("%s: fail to reset the LAST_ID for seq (%#llx"
451                                ") from %llu to %llu\n", ofd_name(ofd),
452                                ostid_seq(&oseq->os_oi), id + nr - 1,
453                                ofd_seq_last_oid(oseq));
454         }
455
456         if (objects)
457                 ofd_precreate_cb_add(env, th, oseq, objects);
458 trans_stop:
459         rc2 = ofd_trans_stop(env, ofd, th, rc);
460         if (rc2)
461                 CERROR("%s: failed to stop transaction: rc = %d\n",
462                        ofd_name(ofd), rc2);
463         if (!rc)
464                 rc = rc2;
465 out:
466         for (i = 0; i < nr_saved; i++) {
467                 fo = batch[i];
468                 if (!fo)
469                         continue;
470                 ofd_object_put(env, fo);
471         }
472         OBD_FREE_PTR_ARRAY(batch, nr_saved);
473
474         CDEBUG((objects == 0 && rc == 0) ? D_ERROR : D_OTHER,
475                "created %d/%d objects: %d\n", objects, nr_saved, rc);
476
477         LASSERT(ergo(objects == 0, rc < 0));
478         RETURN(objects > 0 ? objects : rc);
479 }
480
481 /**
482  * Fix the OFD object ownership.
483  *
484  * If the object still has SUID+SGID bits set, meaning that it was precreated
485  * by the MDT before it was assigned to any file, (see ofd_precreate_objects())
486  * then we will accept the UID/GID/PROJID if sent by the client for initializing
487  * the ownership of this object.  We only allow this to happen once (so clear
488  * these bits) and later only allow setattr.
489  *
490  * \param[in] env        execution environment
491  * \param[in] fo         OFD object
492  * \param[in] la         object attributes
493  * \param[in] is_setattr was this function called from setattr or not
494  *
495  * \retval              0 if successful
496  * \retval              negative value on error
497  */
498 int ofd_attr_handle_id(const struct lu_env *env, struct ofd_object *fo,
499                          struct lu_attr *la, int is_setattr)
500 {
501         struct ofd_thread_info  *info = ofd_info(env);
502         struct lu_attr          *ln = &info->fti_attr2;
503         __u32                    mask = 0;
504         int                      rc;
505
506         ENTRY;
507
508         if (!(la->la_valid & LA_UID) && !(la->la_valid & LA_GID) &&
509             !(la->la_valid & LA_PROJID))
510                 RETURN(0);
511
512         rc = dt_attr_get(env, ofd_object_child(fo), ln);
513         if (rc != 0)
514                 RETURN(rc);
515
516         LASSERT(ln->la_valid & LA_MODE);
517
518         /*
519          * Only allow setattr to change UID/GID/PROJID, if
520          * SUID+SGID is not set which means this is not
521          * initialization of this objects.
522          */
523         if (!is_setattr) {
524                 if (!(ln->la_mode & S_ISUID))
525                         la->la_valid &= ~LA_UID;
526                 if (!(ln->la_mode & S_ISGID))
527                         la->la_valid &= ~LA_GID;
528                 if (!(ln->la_mode & S_ISVTX))
529                         la->la_valid &= ~LA_PROJID;
530         }
531
532         /* Initialize ownership of this object, clear SUID+SGID bits*/
533         if ((la->la_valid & LA_UID) && (ln->la_mode & S_ISUID))
534                 mask |= S_ISUID;
535         if ((la->la_valid & LA_GID) && (ln->la_mode & S_ISGID))
536                 mask |= S_ISGID;
537         if ((la->la_valid & LA_PROJID) && (ln->la_mode & S_ISVTX))
538                 mask |= S_ISVTX;
539         if (mask != 0) {
540                 if (!(la->la_valid & LA_MODE) || !is_setattr) {
541                         la->la_mode = ln->la_mode;
542                         la->la_valid |= LA_MODE;
543                 }
544                 la->la_mode &= ~mask;
545         }
546
547         RETURN(0);
548 }
549
550 /**
551  * Check if it needs to update filter_fid by the value of @oa.
552  *
553  * \param[in] env       env
554  * \param[in] fo        ofd object
555  * \param[in] oa        obdo from client or MDT
556  * \param[out] ff       if filter_fid needs updating, this field is used to
557  *                      return the new buffer
558  *
559  * \retval < 0          error occurred
560  * \retval 0            doesn't need to update filter_fid
561  * \retval FL_XATTR_{CREATE,REPLACE}    flag for xattr update
562  */
563 int ofd_object_ff_update(const struct lu_env *env, struct ofd_object *fo,
564                          const struct obdo *oa, struct filter_fid *ff)
565 {
566         int rc = 0;
567         ENTRY;
568
569         if (!(oa->o_valid &
570               (OBD_MD_FLFID | OBD_MD_FLOSTLAYOUT | OBD_MD_LAYOUT_VERSION)))
571                 RETURN(0);
572
573         rc = ofd_object_ff_load(env, fo);
574         if (rc < 0 && rc != -ENODATA)
575                 RETURN(rc);
576
577         LASSERT(ff != &fo->ofo_ff);
578         if (rc == -ENODATA) {
579                 rc = LU_XATTR_CREATE;
580                 memset(ff, 0, sizeof(*ff));
581         } else {
582                 rc = LU_XATTR_REPLACE;
583                 memcpy(ff, &fo->ofo_ff, sizeof(*ff));
584         }
585
586         if (oa->o_valid & OBD_MD_FLFID) {
587                 /* packing fid and converting it to LE for storing into EA.
588                  * Here ->o_stripe_idx should be filled by LOV and rest of
589                  * fields - by client. */
590                 ff->ff_parent.f_seq = oa->o_parent_seq;
591                 ff->ff_parent.f_oid = oa->o_parent_oid;
592                 /* XXX: we are ignoring o_parent_ver here, since this should
593                  *      be the same for all objects in this fileset. */
594                 ff->ff_parent.f_ver = oa->o_stripe_idx;
595         }
596         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
597                 ff->ff_layout = oa->o_layout;
598
599         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
600                 CDEBUG(D_INODE, DFID": OST("DFID") layout version %u -> %u\n",
601                        PFID(&fo->ofo_ff.ff_parent),
602                        PFID(lu_object_fid(&fo->ofo_obj.do_lu)),
603                        ff->ff_layout_version, oa->o_layout_version);
604
605                 /**
606                  * resync write from client on non-primary objects and
607                  * resync start from MDS on primary objects will contain
608                  * LU_LAYOUT_RESYNC flag in the @oa.
609                  *
610                  * The layout version checking for write/punch from client
611                  * happens in ofd_verify_layout_version() before coming to
612                  * here, so that resync with smaller layout version client
613                  * will be rejected there, the biggest resync version will
614                  * be recorded in the OFD objects.
615                  */
616                 if (ff->ff_layout_version & LU_LAYOUT_RESYNC) {
617                         /* this opens a new era of writing */
618                         ff->ff_layout_version = 0;
619                         ff->ff_range = 0;
620                 }
621
622                 /* it's not allowed to change it to a smaller value */
623                 if (ofd_layout_version_less(oa->o_layout_version,
624                                             ff->ff_layout_version))
625                         RETURN(-EINVAL);
626
627                 if (ff->ff_layout_version == 0 ||
628                     oa->o_layout_version & LU_LAYOUT_RESYNC) {
629                         /* if LU_LAYOUT_RESYNC is set, it closes the era of
630                          * writing. Only mirror I/O can write this object. */
631                         ff->ff_layout_version = oa->o_layout_version;
632                         ff->ff_range = 0;
633                 } else if (oa->o_layout_version > ff->ff_layout_version) {
634                         ff->ff_range = max_t(__u32, ff->ff_range,
635                                              oa->o_layout_version -
636                                              ff->ff_layout_version);
637                 }
638         }
639
640         if (memcmp(ff, &fo->ofo_ff, sizeof(*ff)))
641                 filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
642         else /* no change */
643                 rc = 0;
644
645         RETURN(rc);
646 }
647
648 /**
649  * Set OFD object attributes.
650  *
651  * This function sets OFD object attributes taken from incoming request.
652  * It sets not only regular attributes but also XATTR_NAME_FID extended
653  * attribute if needed. The "fid" xattr allows the object's MDT parent inode
654  * to be found and verified by LFSCK and other tools in case of inconsistency.
655  *
656  * \param[in] env       execution environment
657  * \param[in] fo        OFD object
658  * \param[in] la        object attributes
659  * \param[in] oa        obdo carries fid, ost_layout, layout version
660  *
661  * \retval              0 if successful
662  * \retval              negative value on error
663  */
664 int ofd_attr_set(const struct lu_env *env, struct ofd_object *fo,
665                  struct lu_attr *la, struct obdo *oa)
666 {
667         struct ofd_thread_info *info = ofd_info(env);
668         struct ofd_device *ofd = ofd_obj2dev(fo);
669         struct filter_fid *ff = &info->fti_mds_fid;
670         struct thandle *th;
671         int fl, rc, rc2;
672
673         ENTRY;
674
675         if (!ofd_object_exists(fo))
676                 GOTO(out, rc = -ENOENT);
677
678         /* VBR: version recovery check */
679         rc = ofd_version_get_check(info, fo);
680         if (rc)
681                 GOTO(out, rc);
682
683         rc = ofd_attr_handle_id(env, fo, la, 1 /* is_setattr */);
684         if (rc != 0)
685                 GOTO(out, rc);
686
687         th = ofd_trans_create(env, ofd);
688         if (IS_ERR(th))
689                 GOTO(out, rc = PTR_ERR(th));
690
691         rc = dt_declare_attr_set(env, ofd_object_child(fo), la, th);
692         if (rc)
693                 GOTO(stop, rc);
694
695         info->fti_buf.lb_buf = ff;
696         info->fti_buf.lb_len = sizeof(*ff);
697         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
698                                   XATTR_NAME_FID, 0, th);
699         if (rc)
700                 GOTO(stop, rc);
701
702         rc = ofd_trans_start(env, ofd, la->la_valid & LA_SIZE ? fo : NULL, th);
703         if (rc)
704                 GOTO(stop, rc);
705
706         ofd_write_lock(env, fo);
707         if (!ofd_object_exists(fo))
708                 GOTO(unlock, rc = -ENOENT);
709
710         /* serialize vs ofd_commitrw_write() */
711         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
712                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
713                                info->fti_xid);
714
715         rc = dt_attr_set(env, ofd_object_child(fo), la, th);
716         if (rc)
717                 GOTO(unlock, rc);
718
719         fl = ofd_object_ff_update(env, fo, oa, ff);
720         if (fl < 0)
721                 GOTO(unlock, rc = fl);
722
723         if (fl) {
724                 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
725                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
726                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
727                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
728                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
729                         GOTO(unlock, rc);
730
731                 info->fti_buf.lb_buf = ff;
732                 info->fti_buf.lb_len = sizeof(*ff);
733                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
734                                   XATTR_NAME_FID, fl, th);
735                 if (!rc)
736                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
737         }
738
739         GOTO(unlock, rc);
740
741 unlock:
742         ofd_write_unlock(env, fo);
743 stop:
744         rc2 = ofd_trans_stop(env, ofd, th, rc);
745         if (rc2)
746                 CERROR("%s: failed to stop transaction: rc = %d\n",
747                        ofd_name(ofd), rc2);
748         if (!rc)
749                 rc = rc2;
750 out:
751         return rc;
752 }
753
754 /**
755  * Fallocate(Preallocate) space for OFD object.
756  *
757  * This function allocates space for the object from the \a start
758  * offset to the \a end offset.
759  *
760  * \param[in] env       execution environment
761  * \param[in] fo        OFD object
762  * \param[in] start     start offset to allocate from
763  * \param[in] end       end of allocate
764  * \param[in] mode      fallocate mode
765  * \param[in] la        object attributes
766  * \param[in] ff        filter_fid structure
767  *
768  * \retval              0 if successful
769  * \retval              negative value on error
770  */
771 int ofd_object_fallocate(const struct lu_env *env, struct ofd_object *fo,
772                          __u64 start, __u64 end, int mode, struct lu_attr *la,
773                          struct obdo *oa)
774 {
775         struct ofd_thread_info *info = ofd_info(env);
776         struct ofd_device *ofd = ofd_obj2dev(fo);
777         struct dt_object *dob = ofd_object_child(fo);
778         struct thandle *th;
779         struct filter_fid *ff = &info->fti_mds_fid;
780         bool ff_needed = false;
781         int rc;
782
783         ENTRY;
784
785         if (!ofd_object_exists(fo))
786                 RETURN(-ENOENT);
787
788         /* VBR: version recovery check */
789         rc = ofd_version_get_check(info, fo);
790         if (rc != 0)
791                 RETURN(rc);
792
793         if (ff != NULL) {
794                 rc = ofd_object_ff_load(env, fo);
795                 if (rc == -ENODATA)
796                         ff_needed = true;
797                 else if (rc < 0)
798                         RETURN(rc);
799
800                 if (ff_needed) {
801                         if (oa->o_valid & OBD_MD_FLFID) {
802                                 ff->ff_parent.f_seq = oa->o_parent_seq;
803                                 ff->ff_parent.f_oid = oa->o_parent_oid;
804                                 ff->ff_parent.f_ver = oa->o_stripe_idx;
805                         }
806                         if (oa->o_valid & OBD_MD_FLOSTLAYOUT)
807                                 ff->ff_layout = oa->o_layout;
808                         if (oa->o_valid & OBD_MD_LAYOUT_VERSION)
809                                 ff->ff_layout_version = oa->o_layout_version;
810                         filter_fid_cpu_to_le(ff, ff, sizeof(*ff));
811                 }
812         }
813
814         th = ofd_trans_create(env, ofd);
815         if (IS_ERR(th))
816                 RETURN(PTR_ERR(th));
817
818         rc = dt_declare_attr_set(env, dob, la, th);
819         if (rc)
820                 GOTO(stop, rc);
821
822         rc = dt_declare_fallocate(env, dob, start, end, mode, th);
823         if (rc)
824                 GOTO(stop, rc);
825
826         if (ff_needed) {
827                 info->fti_buf.lb_buf = ff;
828                 info->fti_buf.lb_len = sizeof(*ff);
829                 rc = dt_declare_xattr_set(env, ofd_object_child(fo),
830                                           &info->fti_buf, XATTR_NAME_FID, 0,
831                                           th);
832                 if (rc)
833                         GOTO(stop, rc);
834         }
835
836         rc = ofd_trans_start(env, ofd, fo, th);
837         if (rc)
838                 GOTO(stop, rc);
839
840         ofd_read_lock(env, fo);
841         if (!ofd_object_exists(fo))
842                 GOTO(unlock, rc = -ENOENT);
843
844         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
845                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
846                                info->fti_xid);
847
848         rc = dt_falloc(env, dob, start, end, mode, th);
849         if (rc)
850                 GOTO(unlock, rc);
851
852         rc = dt_attr_set(env, dob, la, th);
853         if (rc)
854                 GOTO(unlock, rc);
855
856         if (ff_needed) {
857                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
858                                   XATTR_NAME_FID, 0, th);
859                 if (!rc)
860                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
861         }
862 unlock:
863         ofd_read_unlock(env, fo);
864 stop:
865         ofd_trans_stop(env, ofd, th, rc);
866         RETURN(rc);
867 }
868
869 /**
870  * Truncate/punch OFD object.
871  *
872  * This function frees all of the allocated object's space from the \a start
873  * offset to the \a end offset. For truncate() operations the \a end offset
874  * is OBD_OBJECT_EOF. The functionality to punch holes in an object via
875  * fallocate(FALLOC_FL_PUNCH_HOLE) is not yet implemented (see LU-3606).
876  *
877  * \param[in] env       execution environment
878  * \param[in] fo        OFD object
879  * \param[in] start     start offset to punch from
880  * \param[in] end       end of punch
881  * \param[in] la        object attributes
882  * \param[in] oa        obdo struct from incoming request
883  *
884  * \retval              0 if successful
885  * \retval              negative value on error
886  */
887 int ofd_object_punch(const struct lu_env *env, struct ofd_object *fo,
888                      __u64 start, __u64 end, struct lu_attr *la,
889                      struct obdo *oa)
890 {
891         struct ofd_thread_info *info = ofd_info(env);
892         struct ofd_device *ofd = ofd_obj2dev(fo);
893         struct dt_object *dob = ofd_object_child(fo);
894         struct filter_fid *ff = &info->fti_mds_fid;
895         struct thandle *th;
896         int fl, rc, rc2;
897
898         ENTRY;
899
900         /* we support truncate, not punch yet */
901         LASSERT(end == OBD_OBJECT_EOF);
902
903         if (!ofd_object_exists(fo))
904                 GOTO(out, rc = -ENOENT);
905
906         if (ofd->ofd_lfsck_verify_pfid && oa->o_valid & OBD_MD_FLFID) {
907                 rc = ofd_verify_ff(env, fo, oa);
908                 if (rc != 0)
909                         GOTO(out, rc);
910         }
911
912         /* VBR: version recovery check */
913         rc = ofd_version_get_check(info, fo);
914         if (rc)
915                 GOTO(out, rc);
916
917         rc = ofd_attr_handle_id(env, fo, la, 0 /* !is_setattr */);
918         if (rc != 0)
919                 GOTO(out, rc);
920
921         th = ofd_trans_create(env, ofd);
922         if (IS_ERR(th))
923                 GOTO(out, rc = PTR_ERR(th));
924
925         if (oa->o_valid & OBD_MD_FLFLAGS && oa->o_flags & LUSTRE_ENCRYPT_FL) {
926                 /* punch must be aware we are dealing with an encrypted file */
927                 la->la_valid |= LA_FLAGS;
928                 la->la_flags |= LUSTRE_ENCRYPT_FL;
929         }
930         rc = dt_declare_attr_set(env, dob, la, th);
931         if (rc)
932                 GOTO(stop, rc);
933
934         rc = dt_declare_punch(env, dob, start, OBD_OBJECT_EOF, th);
935         if (rc)
936                 GOTO(stop, rc);
937
938         info->fti_buf.lb_buf = ff;
939         info->fti_buf.lb_len = sizeof(*ff);
940         rc = dt_declare_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
941                                   XATTR_NAME_FID, 0, th);
942         if (rc)
943                 GOTO(stop, rc);
944
945         rc = ofd_trans_start(env, ofd, fo, th);
946         if (rc)
947                 GOTO(stop, rc);
948
949         ofd_write_lock(env, fo);
950
951         if (la->la_valid & (LA_ATIME | LA_MTIME | LA_CTIME))
952                 tgt_fmd_update(info->fti_exp, &fo->ofo_header.loh_fid,
953                                info->fti_xid);
954
955         if (!ofd_object_exists(fo))
956                 GOTO(unlock, rc = -ENOENT);
957
958         /* need to verify layout version */
959         if (oa->o_valid & OBD_MD_LAYOUT_VERSION) {
960                 rc = ofd_verify_layout_version(env, fo, oa);
961                 if (rc)
962                         GOTO(unlock, rc);
963         }
964
965         rc = dt_punch(env, dob, start, OBD_OBJECT_EOF, th);
966         if (rc)
967                 GOTO(unlock, rc);
968
969         fl = ofd_object_ff_update(env, fo, oa, ff);
970         if (fl < 0)
971                 GOTO(unlock, rc = fl);
972
973         rc = dt_attr_set(env, dob, la, th);
974         if (rc)
975                 GOTO(unlock, rc);
976
977         if (fl) {
978                 if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR1))
979                         ff->ff_parent.f_oid = cpu_to_le32(1UL << 31);
980                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_UNMATCHED_PAIR2))
981                         le32_add_cpu(&ff->ff_parent.f_oid, -1);
982                 else if (CFS_FAIL_CHECK(OBD_FAIL_LFSCK_NOPFID))
983                         GOTO(unlock, rc);
984
985                 info->fti_buf.lb_buf = ff;
986                 info->fti_buf.lb_len = sizeof(*ff);
987                 rc = dt_xattr_set(env, ofd_object_child(fo), &info->fti_buf,
988                                   XATTR_NAME_FID, fl, th);
989                 if (!rc)
990                         filter_fid_le_to_cpu(&fo->ofo_ff, ff, sizeof(*ff));
991         }
992
993         GOTO(unlock, rc);
994
995 unlock:
996         ofd_write_unlock(env, fo);
997 stop:
998         rc2 = ofd_trans_stop(env, ofd, th, rc);
999         if (rc2 != 0)
1000                 CERROR("%s: failed to stop transaction: rc = %d\n",
1001                        ofd_name(ofd), rc2);
1002         if (!rc)
1003                 rc = rc2;
1004 out:
1005         return rc;
1006 }
1007
1008 /**
1009  * Destroy OFD object.
1010  *
1011  * This function destroys OFD object. If object wasn't used at all (orphan)
1012  * then local transaction is used, which means the transaction data is not
1013  * returned back in reply.
1014  *
1015  * \param[in] env       execution environment
1016  * \param[in] fo        OFD object
1017  * \param[in] orphan    flag to indicate that object is orphaned
1018  *
1019  * \retval              0 if successful
1020  * \retval              negative value on error
1021  */
1022 int ofd_destroy(const struct lu_env *env, struct ofd_object *fo,
1023                        int orphan)
1024 {
1025         struct ofd_device       *ofd = ofd_obj2dev(fo);
1026         struct thandle          *th;
1027         int                     rc = 0;
1028         int                     rc2;
1029
1030         ENTRY;
1031
1032         if (!ofd_object_exists(fo))
1033                 GOTO(out, rc = -ENOENT);
1034
1035         th = ofd_trans_create(env, ofd);
1036         if (IS_ERR(th))
1037                 GOTO(out, rc = PTR_ERR(th));
1038
1039         rc = dt_declare_ref_del(env, ofd_object_child(fo), th);
1040         if (rc < 0)
1041                 GOTO(stop, rc);
1042
1043         rc = dt_declare_destroy(env, ofd_object_child(fo), th);
1044         if (rc < 0)
1045                 GOTO(stop, rc);
1046
1047         if (orphan)
1048                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
1049         else
1050                 rc = ofd_trans_start(env, ofd, NULL, th);
1051         if (rc)
1052                 GOTO(stop, rc);
1053
1054         ofd_write_lock(env, fo);
1055         if (!ofd_object_exists(fo))
1056                 GOTO(unlock, rc = -ENOENT);
1057
1058         tgt_fmd_drop(ofd_info(env)->fti_exp, &fo->ofo_header.loh_fid);
1059
1060         dt_ref_del(env, ofd_object_child(fo), th);
1061         dt_destroy(env, ofd_object_child(fo), th);
1062 unlock:
1063         ofd_write_unlock(env, fo);
1064 stop:
1065         rc2 = ofd_trans_stop(env, ofd, th, rc);
1066         if (rc2)
1067                 CERROR("%s failed to stop transaction: %d\n",
1068                        ofd_name(ofd), rc2);
1069         if (!rc)
1070                 rc = rc2;
1071 out:
1072         RETURN(rc);
1073 }
1074
1075 /**
1076  * Get OFD object attributes.
1077  *
1078  * This function gets OFD object regular attributes. It is used to serve
1079  * incoming request as well as for local OFD purposes.
1080  *
1081  * \param[in] env       execution environment
1082  * \param[in] fo        OFD object
1083  * \param[in] la        object attributes
1084  *
1085  * \retval              0 if successful
1086  * \retval              negative value on error
1087  */
1088 int ofd_attr_get(const struct lu_env *env, struct ofd_object *fo,
1089                  struct lu_attr *la)
1090 {
1091         int rc = 0;
1092
1093         ENTRY;
1094
1095         if (ofd_object_exists(fo)) {
1096                 rc = dt_attr_get(env, ofd_object_child(fo), la);
1097         } else {
1098                 rc = -ENOENT;
1099         }
1100         RETURN(rc);
1101 }