Whamcloud - gitweb
Merge "LU-9771 flr: Merge branch 'flr'"
[fs/lustre-release.git] / lustre / osp / osp_sync.c
index 2180944..1881e5b 100644 (file)
@@ -95,6 +95,9 @@ struct osp_job_req_args {
        __u32                           jra_magic;
 };
 
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+                                 struct osp_device *d, struct thandle *th);
+
 static inline int osp_sync_running(struct osp_device *d)
 {
        return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
@@ -349,28 +352,6 @@ int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
        RETURN(rc);
 }
 
-/* add the commit callback every second */
-int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
-                             struct thandle *th)
-{
-       int add = 0;
-
-       /* fast path */
-       if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb))
-               return 0;
-
-       spin_lock(&d->opd_sync_lock);
-       if (cfs_time_aftereq(cfs_time_current(), d->opd_sync_next_commit_cb))
-               add = 1;
-       d->opd_sync_next_commit_cb = cfs_time_shift(1);
-       spin_unlock(&d->opd_sync_lock);
-
-       if (add == 0)
-               return 0;
-       return osp_sync_add_commit_cb(env, d, th);
-}
-
-
 /**
  * Generate a llog record for a given change.
  *
@@ -400,6 +381,7 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
        struct osp_thread_info  *osi = osp_env_info(env);
        struct llog_ctxt        *ctxt;
        struct thandle          *storage_th;
+       bool                     immediate_commit_cb = false;
        int                      rc;
 
        ENTRY;
@@ -427,11 +409,20 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                LASSERT(attr);
                osi->osi_setattr.lsr_uid = attr->la_uid;
                osi->osi_setattr.lsr_gid = attr->la_gid;
+               osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
                osi->osi_setattr.lsr_projid = attr->la_projid;
                osi->osi_setattr.lsr_valid =
                        ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
                        ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
                        ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
+               if (attr->la_valid & LA_LAYOUT_VERSION) {
+                       osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
+
+                       /* FLR: the layout version has to be transferred to
+                        * OST objects ASAP, otherwise clients will have to
+                        * experience delay to be able to write OST objects. */
+                       immediate_commit_cb = true;
+               }
                break;
        default:
                LBUG();
@@ -461,7 +452,10 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                atomic_inc(&d->opd_sync_changes);
        }
 
-       rc = osp_sync_add_commit_cb_1s(env, d, th);
+       if (immediate_commit_cb)
+               rc = osp_sync_add_commit_cb(env, d, th);
+       else
+               rc = osp_sync_add_commit_cb_1s(env, d, th);
 
        /* return 0 always here, error case just cause no llog record */
        RETURN(0);
@@ -744,7 +738,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
         * so no bits other than these should be set. */
        if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
-           OBD_MD_FLPROJID)) != 0) {
+           OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
                CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
                        d->opd_obd->obd_name, rec->lsr_valid);
                /* return 1 on invalid record */
@@ -761,9 +755,11 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        body->oa.o_uid = rec->lsr_uid;
        body->oa.o_gid = rec->lsr_gid;
        body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
-       if (h->lrh_len > sizeof(struct llog_setattr64_rec))
-               body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
-                                     rec)->lsr_projid;
+       if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
+               struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
+               body->oa.o_projid = rec_v2->lsr_projid;
+               body->oa.o_layout_version = rec_v2->lsr_layout_version;
+       }
 
        /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
         * we assume that both UID and GID are valid in that case. */
@@ -772,6 +768,12 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
        else
                body->oa.o_valid |= rec->lsr_valid;
 
+       if (body->oa.o_valid & OBD_MD_LAYOUT_VERSION) {
+               OBD_FAIL_TIMEOUT(OBD_FAIL_FLR_LV_DELAY, cfs_fail_val);
+               if (unlikely(OBD_FAIL_CHECK(OBD_FAIL_FLR_LV_INC)))
+                       ++body->oa.o_layout_version;
+       }
+
        osp_sync_send_new_rpc(d, llh, h, req);
        RETURN(0);
 }
@@ -1555,8 +1557,8 @@ void osp_sync_local_commit_cb(struct lu_env *env, struct thandle *th,
        OBD_FREE_PTR(cb);
 }
 
-int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d,
-                          struct thandle *th)
+static int osp_sync_add_commit_cb(const struct lu_env *env,
+                                 struct osp_device *d, struct thandle *th)
 {
        struct osp_last_committed_cb    *cb;
        struct dt_txn_commit_cb         *dcb;
@@ -1586,6 +1588,29 @@ int osp_sync_add_commit_cb(const struct lu_env *env, struct osp_device *d,
        return rc;
 }
 
+/* add the commit callback every second */
+int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
+                             struct thandle *th)
+{
+       bool add = false;
+
+       /* fast path */
+       if (cfs_time_before(cfs_time_current(), d->opd_sync_next_commit_cb))
+               return 0;
+
+       spin_lock(&d->opd_sync_lock);
+       if (cfs_time_aftereq(cfs_time_current(), d->opd_sync_next_commit_cb)) {
+               add = true;
+               d->opd_sync_next_commit_cb = cfs_time_shift(1);
+       }
+       spin_unlock(&d->opd_sync_lock);
+
+       if (!add)
+               return 0;
+
+       return osp_sync_add_commit_cb(env, d, th);
+}
+
 /*
  * generate an empty transaction and hook the commit callback in
  * then force transaction commit