Whamcloud - gitweb
LU-11186 ofd: fix for a final oid at sequence
[fs/lustre-release.git] / lustre / ofd / ofd_fs.c
index 25c0009..8be08a3 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014 Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -54,7 +54,7 @@
  * value which shouldn't be exceeded.
  *
  * \param[in] ofd      OFD device
- * \param[in] int      number of updates in the batch
+ * \param[in] batch    number of updates in the batch
  *
  * \retval             \a batch limited by ofd_device::ofd_precreate_batch
  */
@@ -81,7 +81,7 @@ int ofd_precreate_batch(struct ofd_device *ofd, int batch)
  * \retval             pointer to the requested ofd_seq structure
  * \retval             NULL if ofd_seq is not found
  */
-struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq)
+struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, u64 seq)
 {
        struct ofd_seq *oseq;
 
@@ -111,7 +111,7 @@ void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
        if (atomic_dec_and_test(&oseq->os_refc)) {
                LASSERT(list_empty(&oseq->os_list));
                LASSERT(oseq->os_lastid_obj != NULL);
-               lu_object_put(env, &oseq->os_lastid_obj->do_lu);
+               dt_object_put(env, oseq->os_lastid_obj);
                OBD_FREE_PTR(oseq);
        }
 }
@@ -157,13 +157,13 @@ static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
 /**
  * Get last object ID for the given sequence.
  *
- * \param[in] ofd_seq  OFD sequence structure
+ * \param[in] oseq     OFD sequence structure
  *
  * \retval             the last object ID for this sequence
  */
-obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
+u64 ofd_seq_last_oid(struct ofd_seq *oseq)
 {
-       obd_id id;
+       u64 id;
 
        spin_lock(&oseq->os_last_oid_lock);
        id = ostid_id(&oseq->os_oi);
@@ -178,11 +178,15 @@ obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
  * \param[in] oseq     OFD sequence
  * \param[in] id       the new OID to set
  */
-void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id)
+void ofd_seq_last_oid_set(struct ofd_seq *oseq, u64 id)
 {
        spin_lock(&oseq->os_last_oid_lock);
-       if (likely(ostid_id(&oseq->os_oi) < id))
-               ostid_set_id(&oseq->os_oi, id);
+       if (likely(ostid_id(&oseq->os_oi) < id)) {
+               if (ostid_set_id(&oseq->os_oi, id)) {
+                       CERROR("Bad %llu to set " DOSTID "\n",
+                              (unsigned long long)id, POSTID(&oseq->os_oi));
+               }
+       }
        spin_unlock(&oseq->os_last_oid_lock);
 }
 
@@ -205,13 +209,16 @@ int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
                           struct ofd_seq *oseq)
 {
        struct ofd_thread_info  *info = ofd_info(env);
-       obd_id                   tmp;
+       u64                      tmp;
        struct dt_object        *obj = oseq->os_lastid_obj;
        struct thandle          *th;
        int                      rc;
 
        ENTRY;
 
+       if (ofd->ofd_osd->dd_rdonly)
+               RETURN(0);
+
        tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
 
        info->fti_buf.lb_buf = &tmp;
@@ -249,7 +256,7 @@ out:
  *
  * LWP is lightweight proxy - simplified connection between
  * servers. It is used for FID Location Database (FLDB) and
- * sequence (SEQ) client-server interations.
+ * sequence (SEQ) client-server interactions.
  *
  * This function is used during server cleanup process to free
  * LWP items that were previously set up upon OFD start.
@@ -360,12 +367,12 @@ void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
  * \retval             ERR_PTR pointer on error
  */
 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
-                            obd_seq seq)
+                            u64 seq)
 {
        struct ofd_thread_info  *info = ofd_info(env);
        struct ofd_seq          *oseq = NULL;
        struct dt_object        *dob;
-       obd_id                   lastid;
+       u64                      lastid;
        int                      rc;
 
        ENTRY;
@@ -403,13 +410,16 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
 
        atomic_set(&oseq->os_refc, 1);
 
-       rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
+       rc = dt_attr_get(env, dob, &info->fti_attr);
        if (rc)
                GOTO(cleanup, rc);
 
        if (info->fti_attr.la_size == 0) {
                /* object is just created, initialize last id */
-               ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
+               if (OBD_FAIL_CHECK(OBD_FAIL_OFD_SET_OID))
+                       ofd_seq_last_oid_set(oseq, 0xffffff00);
+               else
+                       ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
                ofd_seq_last_oid_write(env, ofd, oseq);
        } else if (info->fti_attr.la_size == sizeof(lastid)) {
                info->fti_off = 0;
@@ -424,7 +434,7 @@ struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
                }
                ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
        } else {
-               CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n",
+               CERROR("%s: corrupted size %llu LAST_ID of seq %#llx\n",
                        ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
                GOTO(cleanup, rc = -EINVAL);
        }
@@ -573,27 +583,35 @@ int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
 {
        int rc;
 
+       rwlock_init(&ofd->ofd_seq_list_lock);
+       INIT_LIST_HEAD(&ofd->ofd_seq_list);
+       ofd->ofd_seq_count = 0;
+
        rc = ofd_fid_init(env, ofd);
        if (rc != 0) {
                CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
-               return rc;
+               GOTO(out, rc);
        }
 
        rc = ofd_fld_init(env, ofd_name(ofd), ofd);
        if (rc < 0) {
                CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
-               return rc;
+               GOTO(out_fid, rc);
        }
 
        rc = ofd_register_seq_exp(ofd);
        if (rc < 0) {
                CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
-               return rc;
+               GOTO(out_fld, rc);
        }
 
-       rwlock_init(&ofd->ofd_seq_list_lock);
-       INIT_LIST_HEAD(&ofd->ofd_seq_list);
-       ofd->ofd_seq_count = 0;
+       RETURN(0);
+
+out_fld:
+       ofd_fld_fini(env, ofd);
+out_fid:
+       ofd_fid_fini(env, ofd);
+out:
        return rc;
 }
 
@@ -621,10 +639,10 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
 
        rc = ofd_seqs_init(env, ofd);
        if (rc)
-               GOTO(out_hc, rc);
+               GOTO(out, rc);
 
        if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
-               RETURN (-ENOENT);
+               GOTO(out_seqs, rc = -ENOENT);
 
        lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
        memset(&info->fti_attr, 0, sizeof(info->fti_attr));
@@ -635,13 +653,14 @@ int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
        fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
                               &info->fti_dof, &info->fti_attr);
        if (IS_ERR(fo))
-               GOTO(out, rc = PTR_ERR(fo));
+               GOTO(out_seqs, rc = PTR_ERR(fo));
 
        ofd->ofd_health_check_file = fo;
 
        RETURN(0);
-out_hc:
-       lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
+
+out_seqs:
+       ofd_seqs_fini(env, ofd);
 out:
        return rc;
 }
@@ -668,7 +687,7 @@ void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
                      ofd_name(ofd), rc);
 
        if (ofd->ofd_health_check_file) {
-               lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
+               dt_object_put(env, ofd->ofd_health_check_file);
                ofd->ofd_health_check_file = NULL;
        }