Whamcloud - gitweb
LU-5478 osd: get rid of obd_* typedefs
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 6c80c73..a084301 100644 (file)
@@ -187,6 +187,7 @@ static int llog_osd_read_header(const struct lu_env *env,
        struct llog_rec_hdr     *llh_hdr;
        struct dt_object        *o;
        struct llog_thread_info *lgi;
+       enum llog_flag           flags;
        int                      rc;
 
        ENTRY;
@@ -209,6 +210,8 @@ static int llog_osd_read_header(const struct lu_env *env,
                RETURN(LLOG_EEMPTY);
        }
 
+       flags = handle->lgh_hdr->llh_flags;
+
        lgi->lgi_off = 0;
        lgi->lgi_buf.lb_buf = handle->lgh_hdr;
        lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE;
@@ -243,6 +246,7 @@ static int llog_osd_read_header(const struct lu_env *env,
                RETURN(-EIO);
        }
 
+       handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
        handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index;
 
        RETURN(0);
@@ -587,6 +591,32 @@ static inline void llog_skip_over(__u64 *off, int curr, int goal)
 }
 
 /**
+ * Remove optional fields that the client doesn't expect.
+ * This is typically in order to ensure compatibility with older clients.
+ * It is assumed that since we exclusively remove fields, the block will be
+ * big enough to handle the remapped records. It is also assumed that records
+ * of a block have the same format (i.e.: the same features enabled).
+ *
+ * \param[in,out]    hdr       Header of the block of records to remap.
+ * \param[in,out]    last_hdr   Last header, don't read past this point.
+ * \param[in]        flags     Flags describing the fields to keep.
+ */
+static void changelog_block_trim_ext(struct llog_rec_hdr *hdr,
+                                    struct llog_rec_hdr *last_hdr,
+                                    enum changelog_rec_flags flags)
+{
+       if (hdr->lrh_type != CHANGELOG_REC)
+               return;
+
+       do {
+               struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1);
+
+               changelog_remap_rec(rec, rec->cr_flags & flags);
+               hdr = llog_rec_hdr_next(hdr);
+       } while ((char *)hdr <= (char *)last_hdr);
+}
+
+/**
  * Implementation of the llog_operations::lop_next_block
  *
  * This function finds the the next llog block to return which contains
@@ -685,7 +715,7 @@ static int llog_osd_next_block(const struct lu_env *env,
                                                sizeof(struct llog_rec_tail));
                /* get the last record in block */
                last_rec = (struct llog_rec_hdr *)((char *)buf + rc -
-                                                  le32_to_cpu(tail->lrt_len));
+                                                  tail->lrt_len);
 
                if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec))
                        lustre_swab_llog_rec(last_rec);
@@ -713,6 +743,12 @@ static int llog_osd_next_block(const struct lu_env *env,
                               rec->lrh_index, next_idx);
                        GOTO(out, rc = -ENOENT);
                }
+
+               /* Trim unsupported extensions for compat w/ older clients */
+               if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
+                       changelog_block_trim_ext(rec, last_rec,
+                                                CLF_VERSION | CLF_RENAME);
+
                GOTO(out, rc = 0);
        }
        GOTO(out, rc = -EIO);
@@ -831,6 +867,12 @@ static int llog_osd_prev_block(const struct lu_env *env,
                               rec->lrh_index, prev_idx);
                        GOTO(out, rc = -ENOENT);
                }
+
+               /* Trim unsupported extensions for compat w/ older clients */
+               if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID))
+                       changelog_block_trim_ext(rec, last_rec,
+                                                CLF_VERSION | CLF_RENAME);
+
                GOTO(out, rc = 0);
        }
        GOTO(out, rc = -EIO);
@@ -848,8 +890,8 @@ out:
  * \retval             dt_object of llog directory
  * \retval             ERR_PTR of negative value on error
  */
-struct dt_object *llog_osd_dir_get(const struct lu_env *env,
-                                  struct llog_ctxt *ctxt)
+static struct dt_object *llog_osd_dir_get(const struct lu_env *env,
+                                         struct llog_ctxt *ctxt)
 {
        struct dt_device        *dt;
        struct dt_thread_info   *dti = dt_info(env);
@@ -1020,6 +1062,7 @@ static int llog_osd_declare_create(const struct lu_env *env,
                                   struct llog_handle *res, struct thandle *th)
 {
        struct llog_thread_info         *lgi = llog_info(env);
+       struct dt_insert_rec            *rec = &lgi->lgi_dt_rec;
        struct local_oid_storage        *los;
        struct dt_object                *o;
        int                              rc;
@@ -1051,8 +1094,10 @@ static int llog_osd_declare_create(const struct lu_env *env,
                if (IS_ERR(llog_dir))
                        RETURN(PTR_ERR(llog_dir));
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+               rec->rec_fid = &lgi->lgi_fid;
+               rec->rec_type = S_IFREG;
                rc = dt_declare_insert(env, llog_dir,
-                                      (struct dt_rec *)&lgi->lgi_fid,
+                                      (struct dt_rec *)rec,
                                       (struct dt_key *)res->lgh_name, th);
                lu_object_put(env, &llog_dir->do_lu);
                if (rc)
@@ -1080,6 +1125,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                           struct thandle *th)
 {
        struct llog_thread_info *lgi = llog_info(env);
+       struct dt_insert_rec    *rec = &lgi->lgi_dt_rec;
        struct local_oid_storage *los;
        struct dt_object        *o;
        int                      rc = 0;
@@ -1115,9 +1161,10 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                        RETURN(PTR_ERR(llog_dir));
 
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
+               rec->rec_fid = &lgi->lgi_fid;
+               rec->rec_type = S_IFREG;
                dt_read_lock(env, llog_dir, 0);
-               rc = dt_insert(env, llog_dir,
-                              (struct dt_rec *)&lgi->lgi_fid,
+               rc = dt_insert(env, llog_dir, (struct dt_rec *)rec,
                               (struct dt_key *)res->lgh_name,
                               th, BYPASS_CAPA, 1);
                dt_read_unlock(env, llog_dir);
@@ -1215,7 +1262,9 @@ static int llog_osd_destroy(const struct lu_env *env,
                        GOTO(out_trans, rc);
        }
 
-       dt_declare_ref_del(env, o, th);
+       rc = dt_declare_ref_del(env, o, th);
+       if (rc < 0)
+               GOTO(out_trans, rc);
 
        rc = dt_declare_destroy(env, o, th);
        if (rc)
@@ -1456,7 +1505,9 @@ out_trans:
        lgi->lgi_buf.lb_buf = idarray;
        lgi->lgi_buf.lb_len = size;
        rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
-       if (rc) {
+       /* -EFAULT means the llog is a sparse file. This is not an error
+        * after arbitrary OST index is supported. */
+       if (rc < 0 && rc != -EFAULT) {
                CERROR("%s: error reading CATALOGS: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,  rc);
                GOTO(out, rc);