Whamcloud - gitweb
LU-5565 osd-ldiskfs: handle non-existing objects
[fs/lustre-release.git] / lustre / osp / osp_md_object.c
index 2b924cd..c32b92f 100644 (file)
@@ -20,7 +20,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2013, Intel Corporation.
+ * Copyright (c) 2013, 2014, Intel Corporation.
  */
 /*
  * lustre/osp/osp_md_object.c
@@ -84,15 +84,10 @@ int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object_format *dof,
                                 struct thandle *th)
 {
-       struct osp_thread_info          *osi = osp_env_info(env);
        struct dt_update_request        *update;
-       struct lu_fid                   *fid1;
-       int                             sizes[2] = {sizeof(struct obdo), 0};
-       char                            *bufs[2] = {NULL, NULL};
-       int                             buf_count;
        int                             rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -100,22 +95,6 @@ int osp_md_declare_object_create(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       osi->osi_obdo.o_valid = 0;
-       obdo_from_la(&osi->osi_obdo, attr, attr->la_valid);
-       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
-
-       bufs[0] = (char *)&osi->osi_obdo;
-       buf_count = 1;
-       fid1 = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-       if (hint != NULL && hint->dah_parent) {
-               struct lu_fid *fid2;
-
-               fid2 = (struct lu_fid *)lu_object_fid(&hint->dah_parent->do_lu);
-               sizes[1] = sizeof(*fid2);
-               bufs[1] = (char *)fid2;
-               buf_count++;
-       }
-
        if (lu_object_exists(&dt->do_lu)) {
                /* If the object already exists, we needs to destroy
                 * this orphan object first.
@@ -131,23 +110,27 @@ int osp_md_declare_object_create(const struct lu_env *env,
                 *    but find the object already exists
                 */
                CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)));
 
-               rc = out_insert_update(env, update, OUT_REF_DEL, fid1, 0,
-                                      NULL, NULL);
+               rc = out_ref_del_pack(env, &update->dur_buf,
+                                     lu_object_fid(&dt->do_lu),
+                                     update->dur_batchid);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
                        /* decrease for ".." */
-                       rc = out_insert_update(env, update, OUT_REF_DEL, fid1,
-                                              0, NULL, NULL);
+                       rc = out_ref_del_pack(env, &update->dur_buf,
+                                             lu_object_fid(&dt->do_lu),
+                                             update->dur_batchid);
                        if (rc != 0)
                                GOTO(out, rc);
                }
 
-               rc = out_insert_update(env, update, OUT_DESTROY, fid1, 0, NULL,
-                                      NULL);
+               rc = out_object_destroy_pack(env, &update->dur_buf,
+                                            lu_object_fid(&dt->do_lu),
+                                            update->dur_batchid);
                if (rc != 0)
                        GOTO(out, rc);
 
@@ -157,8 +140,11 @@ int osp_md_declare_object_create(const struct lu_env *env,
                update_inc_batchid(update);
        }
 
-       rc = out_insert_update(env, update, OUT_CREATE, fid1, buf_count, sizes,
-                              (const char **)bufs);
+       rc = out_create_pack(env, &update->dur_buf,
+                            lu_object_fid(&dt->do_lu), attr, hint, dof,
+                            update->dur_batchid);
+       if (rc != 0)
+               GOTO(out, rc);
 out:
        if (rc)
                CERROR("%s: Insert update error: rc = %d\n",
@@ -194,6 +180,7 @@ int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
         * if creation reaches here, it means the object has been created
         * successfully */
        dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
+       dt2osp_obj(dt)->opo_non_exist = 0;
 
        return 0;
 }
@@ -217,10 +204,9 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env,
                                         struct thandle *th)
 {
        struct dt_update_request        *update;
-       struct lu_fid                   *fid;
        int                             rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -228,10 +214,9 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-
-       rc = out_insert_update(env, update, OUT_REF_DEL, fid, 0, NULL, NULL);
-
+       rc = out_ref_del_pack(env, &update->dur_buf,
+                             lu_object_fid(&dt->do_lu),
+                             update->dur_batchid);
        return rc;
 }
 
@@ -275,10 +260,9 @@ static int osp_md_declare_ref_add(const struct lu_env *env,
                                  struct dt_object *dt, struct thandle *th)
 {
        struct dt_update_request        *update;
-       struct lu_fid                   *fid;
        int                             rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -286,9 +270,9 @@ static int osp_md_declare_ref_add(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-
-       rc = out_insert_update(env, update, OUT_REF_ADD, fid, 0, NULL, NULL);
+       rc = out_ref_add_pack(env, &update->dur_buf,
+                             lu_object_fid(&dt->do_lu),
+                             update->dur_batchid);
 
        return rc;
 }
@@ -342,10 +326,7 @@ static void osp_md_ah_init(const struct lu_env *env,
 }
 
 /**
- * Implementation of dt_object_operations::do_declare_attr_get
- *
- * Declare setting attributes of the remote object, i.e. insert remote
- * object attr_set update into RPC.
+ * Add attr_set sub-request into the OUT RPC.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object on which to set attributes
@@ -355,17 +336,13 @@ static void osp_md_ah_init(const struct lu_env *env,
  * \retval             0 if the insertion succeeds.
  * \retval             negative errno if the insertion fails.
  */
-int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                           const struct lu_attr *attr, struct thandle *th)
+int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+                     const struct lu_attr *attr, struct thandle *th)
 {
-       struct osp_thread_info          *osi = osp_env_info(env);
        struct dt_update_request        *update;
-       struct lu_fid                   *fid;
-       int                             size = sizeof(struct obdo);
-       char                            *buf;
        int                             rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -373,16 +350,40 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                return PTR_ERR(update);
        }
 
-       osi->osi_obdo.o_valid = 0;
-       obdo_from_la(&osi->osi_obdo, (struct lu_attr *)attr,
-                    attr->la_valid);
-       lustre_set_wire_obdo(NULL, &osi->osi_obdo, &osi->osi_obdo);
+       rc = out_attr_set_pack(env, &update->dur_buf,
+                              lu_object_fid(&dt->do_lu), attr,
+                              update->dur_batchid);
 
-       buf = (char *)&osi->osi_obdo;
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       return rc;
+}
 
-       rc = out_insert_update(env, update, OUT_ATTR_SET, fid, 1, &size,
-                              (const char **)&buf);
+/**
+ * Implementation of dt_object_operations::do_declare_attr_get
+ *
+ * Declare setting attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then add the modification
+ * sub-request into the OUT RPC here, and such OUT RPC will be triggered
+ * when transaction start.
+ *
+ * \param[in] env      execution environment
+ * \param[in] dt       object on which to set attributes
+ * \param[in] attr     attributes to be set
+ * \param[in] th       the transaction handle
+ *
+ * \retval             0 if the insertion succeeds.
+ * \retval             negative errno if the insertion fails.
+ */
+int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_attr *attr, struct thandle *th)
+{
+       int rc = 0;
+
+       CDEBUG(D_INFO, "declare attr set object "DFID"\n",
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       if (!is_only_remote_trans(th))
+               rc = __osp_md_attr_set(env, dt, attr, th);
 
        return rc;
 }
@@ -390,9 +391,13 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
 /**
  * Implementation of dt_object_operations::do_attr_set
  *
- * Do nothing in this method for now. In DNE phase I, remote updates
- * are actually executed during transaction start, i.e. object attributes
- * have already been set when calling this method.
+ * Set attributes to the specified remote object.
+ *
+ * If the transaction is a remote transaction, then related modification
+ * sub-request has been added in the declare phase and related OUT RPC
+ * has been triggered at transaction start. Otherwise, the modification
+ * sub-request will be added here, and related OUT RPC will be triggered
+ * when transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to set attributes
@@ -406,10 +411,15 @@ int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
                    const struct lu_attr *attr, struct thandle *th,
                    struct lustre_capa *capa)
 {
+       int rc = 0;
+
        CDEBUG(D_INFO, "attr set object "DFID"\n",
               PFID(&dt->do_lu.lo_header->loh_fid));
 
-       RETURN(0);
+       if (is_only_remote_trans(th))
+               rc = __osp_md_attr_set(env, dt, attr, th);
+
+       RETURN(rc);
 }
 
 /**
@@ -530,7 +540,6 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
        struct dt_update_request   *update;
        struct object_update_reply *reply;
        struct ptlrpc_request      *req = NULL;
-       int                        size = strlen((char *)key) + 1;
        struct lu_fid              *fid;
        int                        rc;
        ENTRY;
@@ -539,14 +548,13 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
         * just create an update buffer, instead of attaching the
         * update_remote list of the thandle.
         */
-       update = out_create_update_req(dt_dev);
+       update = dt_update_request_create(dt_dev);
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
-       rc = out_insert_update(env, update, OUT_INDEX_LOOKUP,
-                              lu_object_fid(&dt->do_lu),
-                              1, &size, (const char **)&key);
-       if (rc) {
+       rc = out_index_lookup_pack(env, &update->dur_buf,
+                                  lu_object_fid(&dt->do_lu), rec, key);
+       if (rc != 0) {
                CERROR("%s: Insert update error: rc = %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
                GOTO(out, rc);
@@ -596,7 +604,7 @@ out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       out_destroy_update_req(update);
+       dt_update_request_destroy(update);
 
        return rc;
 }
@@ -622,22 +630,10 @@ static int osp_md_declare_insert(const struct lu_env *env,
                                 const struct dt_key *key,
                                 struct thandle *th)
 {
-       struct osp_thread_info     *info = osp_env_info(env);
-       struct dt_update_request   *update;
-       struct dt_insert_rec       *rec1 = (struct dt_insert_rec *)rec;
-       struct lu_fid              *fid =
-                               (struct lu_fid *)lu_object_fid(&dt->do_lu);
-       struct lu_fid              *rec_fid = &info->osi_fid;
-       __u32                       type = cpu_to_le32(rec1->rec_type);
-       int                         size[3] = { strlen((char *)key) + 1,
-                                               sizeof(*rec_fid),
-                                               sizeof(type) };
-       const char                 *bufs[3] = { (char *)key,
-                                               (char *)rec_fid,
-                                               (char *)&type };
-       int                         rc;
-
-       update = out_find_create_update_loc(th, dt);
+       struct dt_update_request *update;
+       int                      rc;
+
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -645,13 +641,9 @@ static int osp_md_declare_insert(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       CDEBUG(D_INFO, "%s: insert index of "DFID" %s: "DFID", %u\n",
-              dt->do_lu.lo_dev->ld_obd->obd_name,
-              PFID(fid), (char *)key, PFID(rec1->rec_fid), rec1->rec_type);
-
-       fid_cpu_to_le(rec_fid, rec1->rec_fid);
-       rc = out_insert_update(env, update, OUT_INDEX_INSERT, fid,
-                              ARRAY_SIZE(size), size, bufs);
+       rc = out_index_insert_pack(env, &update->dur_buf,
+                                  lu_object_fid(&dt->do_lu), rec, key,
+                                  update->dur_batchid);
        return rc;
 }
 
@@ -703,11 +695,9 @@ static int osp_md_declare_delete(const struct lu_env *env,
                                 struct thandle *th)
 {
        struct dt_update_request *update;
-       struct lu_fid *fid;
-       int size = strlen((char *)key) + 1;
-       int rc;
+       int                      rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -715,11 +705,9 @@ static int osp_md_declare_delete(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-
-       rc = out_insert_update(env, update, OUT_INDEX_DELETE, fid, 1, &size,
-                              (const char **)&key);
-
+       rc = out_index_delete_pack(env, &update->dur_buf,
+                                  lu_object_fid(&dt->do_lu), key,
+                                  update->dur_batchid);
        return rc;
 }
 
@@ -764,7 +752,7 @@ static int osp_md_index_delete(const struct lu_env *env,
  * \retval             1 if it reaches to the end of the index object.
  * \retval             negative errno if the pointer cannot be advanced.
  */
-int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
+static int osp_md_index_it_next(const struct lu_env *env, struct dt_it *di)
 {
        struct osp_it           *it = (struct osp_it *)di;
        struct lu_idxpage       *idxpage;
@@ -863,7 +851,7 @@ static int osp_md_index_it_rec(const struct lu_env *env, const struct dt_it *di,
 {
        struct osp_it           *it = (struct osp_it *)di;
        struct lu_dirent        *ent = (struct lu_dirent *)it->ooi_ent;
-       int                     reclen;
+       size_t                  reclen;
 
        reclen = lu_dirent_calc_size(le16_to_cpu(ent->lde_namelen), attr);
        memcpy(rec, ent, reclen);
@@ -1070,13 +1058,9 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
                                    loff_t pos, struct thandle *th)
 {
        struct dt_update_request  *update;
-       struct lu_fid             *fid;
-       int                       sizes[2] = {buf->lb_len, sizeof(pos)};
-       const char                *bufs[2] = {(char *)buf->lb_buf,
-                                             (char *)&pos};
        ssize_t                   rc;
 
-       update = out_find_create_update_loc(th, dt);
+       update = dt_update_request_find_or_create(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -1084,11 +1068,8 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
                return PTR_ERR(update);
        }
 
-       pos = cpu_to_le64(pos);
-       bufs[1] = (char *)&pos;
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-       rc = out_insert_update(env, update, OUT_WRITE, fid,
-                              ARRAY_SIZE(sizes), sizes, bufs);
+       rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
+                           buf, pos, update->dur_batchid);
 
        return rc;
 
@@ -1116,6 +1097,7 @@ static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
                            struct thandle *handle,
                            struct lustre_capa *capa, int ignore_quota)
 {
+       *pos += buf->lb_len;
        return buf->lb_len;
 }