Whamcloud - gitweb
LU-3534 osp: move RPC pack from declare to execution phase 94/10794/45
authorWang Di <di.wang@intel.com>
Fri, 20 Jun 2014 12:57:54 +0000 (05:57 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 9 Apr 2015 03:22:50 +0000 (03:22 +0000)
1. Since we will have full aysnc update support, i.e. do not
need order update anymore, move RPC pack from declare to
execution phase, and these remote updates will be sent during
transaction stop after local transaction is stopped.

2. Add update callback for each update, so after the update request
is done, these callback will be called correspondently for each
update.

3. Remove tu_sent_after_local_trans, because every remote
transaction will be sent after local transaction is finished, in
lod_trans_stop()->osp_trans_stop(). Note: RPC should be sent after
local transaction is stopped, to avoid sending RPC while holding
transaction.

Change-Id: I296e2eaf8b922fe58fe88df074458545f102a69a
Signed-off-by: Wang Di <di.wang@intel.com>
Reviewed-on: http://review.whamcloud.com/10794
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
13 files changed:
lustre/include/lustre_fid.h
lustre/include/lustre_update.h
lustre/lod/lod_object.c
lustre/mdd/mdd_dir.c
lustre/mdd/mdd_object.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_sync.c
lustre/osp/osp_trans.c
lustre/target/out_lib.c

index f9aa2c1..f325849 100644 (file)
@@ -236,7 +236,9 @@ enum local_oid {
        MDD_LOV_OBJ_OSEQ        = 4121UL,
        LFSCK_NAMESPACE_OID     = 4122UL,
        REMOTE_PARENT_DIR_OID   = 4123UL,
-       SLAVE_LLOG_CATALOGS_OID = 4124UL,
+       /* This definition is obsolete
+        * SLAVE_LLOG_CATALOGS_OID      = 4124UL,
+        */
 };
 
 static inline void lu_local_obj_fid(struct lu_fid *fid, __u32 oid)
index 861d631..3d941b9 100644 (file)
@@ -182,9 +182,6 @@ static inline void update_inc_batchid(struct dt_update_request *update)
 }
 
 /* target/out_lib.c */
-void dt_update_request_destroy(struct dt_update_request *update);
-struct dt_update_request *dt_update_request_create(struct dt_device *dt);
-
 int out_update_pack(const struct lu_env *env, struct update_buffer *ubuf,
                    enum update_type op, const struct lu_fid *fid,
                    int params_count, __u16 *param_sizes, const void **bufs,
index ad146e6..b6c9a4f 100644 (file)
@@ -2732,6 +2732,14 @@ static int lod_xattr_set(const struct lu_env *env,
 
                        rc = lod_sub_object_xattr_set(env, next, buf, name,
                                                      fl, th);
+               } else if (dt_object_remote(dt)) {
+                       /* This only happens during migration, see
+                        * mdd_migrate_create(), in which Master MDT will
+                        * create a remote target object, and only set
+                        * (migrating) stripe EA on the remote object,
+                        * and does not need creating each stripes. */
+                       rc = lod_sub_object_xattr_set(env, next, buf, name,
+                                                     fl, th);
                } else {
                        rc = lod_striping_create(env, dt, NULL, NULL, th);
                }
index 92b3e8f..24ef562 100644 (file)
@@ -3603,13 +3603,6 @@ static int mdd_migrate_entries(const struct lu_env *env,
                                                name, handle);
                        if (rc != 0)
                                GOTO(out_put, rc);
-
-                       if (is_dir) {
-                               rc = mdo_ref_add(env, mdd_tobj, handle);
-                               if (rc != 0)
-                                       GOTO(out_put, rc);
-
-                       }
                }
 
                rc = __mdd_index_delete(env, mdd_sobj, name, is_dir, handle);
@@ -3724,6 +3717,10 @@ static int mdd_declare_migrate_update_name(const struct lu_env *env,
        if (rc != 0)
                return rc;
 
+       rc = mdd_declare_links_add(env, mdd_tobj, handle, NULL, MLAO_IGNORE);
+       if (rc != 0)
+               return rc;
+
        if (S_ISDIR(mdd_object_type(mdd_sobj))) {
                rc = mdo_declare_ref_add(env, mdd_pobj, handle);
                if (rc != 0)
index 88d2e67..0bfd37d 100644 (file)
@@ -366,8 +366,6 @@ int mdd_object_create_internal(const struct lu_env *env, struct mdd_object *p,
 
        rc = mdo_create_obj(env, c, attr, hint, dof, handle);
 
-       LASSERT(ergo(rc == 0, mdd_object_exists(c)));
-
        RETURN(rc);
 }
 
index 435c0fc..dca0776 100644 (file)
@@ -1653,10 +1653,6 @@ static const struct osd_lf_map osd_lf_maps[] = {
        { "LAST_GROUP", { FID_SEQ_LOCAL_FILE, OFD_LAST_GROUP_OID, 0 },
                OLF_SHOW_NAME, sizeof("LAST_GROUP") - 1, NULL, NULL },
 
-       /* SLAVE_LOG, llog for destroy slave stripes of striped dir */
-       { "SLAVE_LOG", { FID_SEQ_LOCAL_FILE, SLAVE_LLOG_CATALOGS_OID, 0 },
-              OLF_SHOW_NAME, sizeof("SLAVE_LOG") - 1, NULL, NULL },
-
        /* lost+found */
        { "lost+found", { FID_SEQ_LOCAL_FILE, OSD_LPF_OID, 0 },
                OLF_SCAN_SUBITEMS, sizeof("lost+found") - 1,
index db105c4..445dfd2 100644 (file)
@@ -501,9 +501,10 @@ static int osp_shutdown(const struct lu_env *env, struct osp_device *d)
 
        rc = osp_disconnect(d);
 
-       osp_sync_fini(d);
-
        if (!d->opd_connect_mdt) {
+               /* stop sync thread */
+               osp_sync_fini(d);
+
                /* stop precreate thread */
                osp_precreate_fini(d);
 
@@ -689,6 +690,10 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev)
        unsigned long      start = cfs_time_current();
        ENTRY;
 
+       /* No Sync between MDTs yet. */
+       if (d->opd_connect_mdt)
+               RETURN(0);
+
        if (unlikely(d->opd_imp_active == 0))
                RETURN(-ENOTCONN);
 
@@ -1046,16 +1051,16 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp,
                rc = osp_init_precreate(osp);
                if (rc)
                        GOTO(out_last_used, rc);
-       }
 
-       /*
-        * Initialize synhronization mechanism taking
-        * care of propogating changes to OST in near
-        * transactional manner.
-        */
-       rc = osp_sync_init(env, osp);
-       if (rc)
-               GOTO(out_precreat, rc);
+               /*
+                * Initialize synhronization mechanism taking
+                * care of propogating changes to OST in near
+                * transactional manner.
+                */
+               rc = osp_sync_init(env, osp);
+               if (rc < 0)
+                       GOTO(out_precreat, rc);
+       }
 
        /*
         * Initiate connect to OST
@@ -1073,8 +1078,9 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp,
        RETURN(0);
 
 out:
-       /* stop sync thread */
-       osp_sync_fini(osp);
+       if (!osp->opd_connect_mdt)
+               /* stop sync thread */
+               osp_sync_fini(osp);
 out_precreat:
        /* stop precreate thread */
        if (!osp->opd_connect_mdt)
index f5fd23d..fd2d41d 100644 (file)
@@ -299,7 +299,6 @@ struct osp_it {
 struct osp_thandle {
        struct thandle           ot_super;
        struct dt_update_request *ot_dur;
-       bool                     ot_send_updates_after_local_trans:1;
 
        /* OSP will use this thandle to update last oid*/
        struct thandle          *ot_storage_th;
@@ -536,11 +535,11 @@ static inline int osp_is_fid_client(struct osp_device *osp)
        return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
 }
 
-typedef int (*osp_async_request_interpreter_t)(const struct lu_env *env,
-                                              struct object_update_reply *rep,
-                                              struct ptlrpc_request *req,
-                                              struct osp_object *obj,
-                                              void *data, int index, int rc);
+typedef int (*osp_update_interpreter_t)(const struct lu_env *env,
+                                       struct object_update_reply *rep,
+                                       struct ptlrpc_request *req,
+                                       struct osp_object *obj,
+                                       void *data, int index, int rc);
 
 /* osp_dev.c */
 void osp_update_last_id(struct osp_device *d, u64 objid);
@@ -550,21 +549,32 @@ extern struct llog_operations osp_mds_ost_orig_logops;
 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             struct osp_object *obj, int count, __u16 *lens,
                             const void **bufs, void *data,
-                            osp_async_request_interpreter_t interpreter);
+                            osp_update_interpreter_t interpreter);
+
 int osp_unplug_async_request(const struct lu_env *env,
                             struct osp_device *osp,
                             struct dt_update_request *update);
+int osp_trans_update_request_create(struct thandle *th);
 struct thandle *osp_trans_create(const struct lu_env *env,
                                 struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
+int osp_insert_update_callback(const struct lu_env *env,
+                              struct dt_update_request *update,
+                              struct osp_object *obj, void *data,
+                              osp_update_interpreter_t interpreter);
+int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct object_update_request *ureq,
+                       struct ptlrpc_request **reqp);
+struct dt_update_request *dt_update_request_create(struct dt_device *dt);
+void dt_update_request_destroy(struct dt_update_request *dt_update);
 
 int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
                        const struct object_update_request *ureq,
                        struct ptlrpc_request **reqp);
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct dt_update_request *update,
-                   struct ptlrpc_request **reqp, bool rpc_lock);
+                   struct ptlrpc_request **reqp);
 
 struct thandle *osp_get_storage_thandle(const struct lu_env *env,
                                        struct thandle *th,
@@ -585,11 +595,6 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th);
 
-int osp_declare_object_destroy(const struct lu_env *env,
-                              struct dt_object *dt, struct thandle *th);
-int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
-                      struct thandle *th);
-
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th);
 
@@ -613,8 +618,10 @@ int osp_md_declare_object_create(const struct lu_env *env,
 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
                         struct lu_attr *attr, struct dt_allocation_hint *hint,
                         struct dt_object_format *dof, struct thandle *th);
-int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
-                     const struct lu_attr *attr, struct thandle *th);
+int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                           const struct lu_attr *attr, struct thandle *th);
+int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
+                   const struct lu_attr *attr, struct thandle *th);
 extern const struct dt_index_operations osp_md_index_ops;
 
 /* osp_precreate.c */
index a587b57..fa86222 100644 (file)
@@ -60,97 +60,40 @@ static const char dot[] = ".";
 static const char dotdot[] = "..";
 
 /**
- * Add OUT_CREATE sub-request into the OUT RPC.
+ * Interpreter call for object creation
  *
- * Note: if the object has already been created, we must add object
- * destroy sub-request ahead of the create, so it will destroy then
- * re-create the object.
+ * Object creation interpreter, which will be called after creating
+ * the remote object to set flags and status.
  *
  * \param[in] env      execution environment
- * \param[in] dt       object to be created
- * \param[in] attr     attribute of the created object
- * \param[in] hint     creation hint
- * \param[in] dof      creation format information
- * \param[in] th       the transaction handle
+ * \param[in] reply    update reply
+ * \param[in] req      ptlrpc update request for creating object
+ * \param[in] obj      object to be created
+ * \param[in] data     data used in this function.
+ * \param[in] index    index(position) of create update in the whole
+ *                      updates
+ * \param[in] rc       update result on the remote MDT.
  *
  * \retval             only return 0 for now
  */
-static int __osp_md_declare_object_create(const struct lu_env *env,
-                                         struct dt_object *dt,
-                                         struct lu_attr *attr,
-                                         struct dt_allocation_hint *hint,
-                                         struct dt_object_format *dof,
-                                         struct thandle *th)
+static int osp_object_create_interpreter(const struct lu_env *env,
+                                        struct object_update_reply *reply,
+                                        struct ptlrpc_request *req,
+                                        struct osp_object *obj,
+                                        void *data, int index, int rc)
 {
-       struct dt_update_request        *update;
-       int                             rc;
-
-       update = thandle_to_dt_update_request(th);
-       LASSERT(update != NULL);
-
-       if (lu_object_exists(&dt->do_lu)) {
-               /* If the object already exists, we needs to destroy
-                * this orphan object first.
-                *
-                * The scenario might happen in this case
-                *
-                * 1. client send remote create to MDT0.
-                * 2. MDT0 send create update to MDT1.
-                * 3. MDT1 finished create synchronously.
-                * 4. MDT0 failed and reboot.
-                * 5. client resend remote create to MDT0.
-                * 6. MDT0 tries to resend create update to MDT1,
-                *    but find the object already exists
-                */
-               CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      PFID(lu_object_fid(&dt->do_lu)));
-
-               rc = out_ref_del_pack(env, &update->dur_buf,
-                                     lu_object_fid(&dt->do_lu),
-                                     update->dur_batchid);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
-                       /* decrease for ".." */
-                       rc = out_ref_del_pack(env, &update->dur_buf,
-                                             lu_object_fid(&dt->do_lu),
-                                             update->dur_batchid);
-                       if (rc != 0)
-                               GOTO(out, rc);
-               }
-
-               rc = out_object_destroy_pack(env, &update->dur_buf,
-                                            lu_object_fid(&dt->do_lu),
-                                            update->dur_batchid);
-               if (rc != 0)
-                       GOTO(out, rc);
-
-               dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
-               /* Increase batchid to add this orphan object deletion
-                * to separate transaction */
-               update_inc_batchid(update);
+       if (rc != 0) {
+               obj->opo_obj.do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
+               obj->opo_non_exist = 1;
        }
-
-       rc = out_create_pack(env, &update->dur_buf,
-                            lu_object_fid(&dt->do_lu), attr, hint, dof,
-                            update->dur_batchid);
-       if (rc != 0)
-               GOTO(out, rc);
-out:
-       if (rc)
-               CERROR("%s: Insert update error: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name, rc);
-
-       return rc;
+       return 0;
 }
 
 /**
  * Implementation of dt_object_operations::do_declare_create
  *
- * For non-remote transaction, it will add an OUT_CREATE sub-request
- * into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       remote object to be created
@@ -159,8 +102,8 @@ out:
  * \param[in] dof      creation format information
  * \param[in] th       the transaction handle
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object *dt,
@@ -169,28 +112,14 @@ int osp_md_declare_object_create(const struct lu_env *env,
                                 struct dt_object_format *dof,
                                 struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_declare_object_create(env, dt, attr, hint,
-                                                   dof, th);
-
-               CDEBUG(D_INFO, "declare create md_object "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_object_operations::do_create
  *
- * For remote transaction, it will add an OUT_CREATE sub-request into
- * the OUT RPC that will be flushed when the transaction stop.
- *
- * It sets necessary flags for created object. In DNE phase I,
- * remote updates are actually executed during transaction start,
- * i.e. the object has already been created when calling this method.
+ * It adds an OUT_CREATE sub-request into the OUT RPC that will be flushed
+ * when the transaction stop, and sets necessary flags for created object.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to be created
@@ -199,189 +128,128 @@ int osp_md_declare_object_create(const struct lu_env *env,
  * \param[in] dof      creation format information
  * \param[in] th       the transaction handle
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing creation succeeds.
+ * \retval             negative errno if packing creation fails.
  */
 int osp_md_object_create(const struct lu_env *env, struct dt_object *dt,
                         struct lu_attr *attr, struct dt_allocation_hint *hint,
                         struct dt_object_format *dof, struct thandle *th)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_declare_object_create(env, dt, attr, hint,
-                                                   dof, th);
-
-               CDEBUG(D_INFO, "create md_object "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       if (rc == 0) {
-               dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS |
-                                                (attr->la_mode & S_IFMT);
-               dt2osp_obj(dt)->opo_non_exist = 0;
-       }
-
-       return rc;
-}
-
-/**
- * Add OUT_REF_DEL sub-request into the OUT RPC.
- *
- * \param[in] env      execution environment
- * \param[in] dt       object to decrease the reference count.
- * \param[in] th       the transaction handle of refcount decrease.
- *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
- */
-static int __osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
-                           struct thandle *th)
-{
        struct dt_update_request        *update;
        int                             rc;
 
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_ref_del_pack(env, &update->dur_buf,
-                             lu_object_fid(&dt->do_lu),
-                             update->dur_batchid);
+       rc = out_create_pack(env, &update->dur_buf,
+                            lu_object_fid(&dt->do_lu), attr, hint, dof,
+                            update->dur_batchid);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       rc = osp_insert_update_callback(env, update, dt2osp_obj(dt), attr,
+                                       osp_object_create_interpreter);
+
+       if (rc < 0)
+               GOTO(out, rc);
+
+       dt->do_lu.lo_header->loh_attr |= LOHA_EXISTS | (attr->la_mode & S_IFMT);
+       dt2osp_obj(dt)->opo_non_exist = 0;
+out:
        return rc;
 }
 
 /**
  * Implementation of dt_object_operations::do_declare_ref_del
  *
- * For non-remote transaction, it will add an OUT_REF_DEL sub-request
- * into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to decrease the reference count.
  * \param[in] th       the transaction handle of refcount decrease.
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 static int osp_md_declare_ref_del(const struct lu_env *env,
                                  struct dt_object *dt, struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_ref_del(env, dt, th);
-
-               CDEBUG(D_INFO, "declare ref del "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_object_operations::do_ref_del
  *
- * For remote transaction, it will add an OUT_REF_DEL sub-request into
- * the OUT RPC that will be flushed when the transaction stop.
+ * Add an OUT_REF_DEL sub-request into the OUT RPC that will be
+ * flushed when the transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to decrease the reference count
  * \param[in] th       the transaction handle
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing ref_del succeeds.
+ * \retval             negative errno if packing fails.
  */
 static int osp_md_ref_del(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_ref_del(env, dt, th);
-
-               CDEBUG(D_INFO, "ref del "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
-}
-
-/**
- * Add OUT_REF_ADD sub-request into the OUT RPC.
- *
- * \param[in] env      execution environment
- * \param[in] dt       object on which to increase the reference count.
- * \param[in] th       the transaction handle.
- *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
- */
-static int __osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
-                           struct thandle *th)
-{
        struct dt_update_request        *update;
        int                             rc;
 
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       rc = out_ref_add_pack(env, &update->dur_buf,
+       rc = out_ref_del_pack(env, &update->dur_buf,
                              lu_object_fid(&dt->do_lu),
                              update->dur_batchid);
-
        return rc;
 }
 
 /**
  * Implementation of dt_object_operations::do_declare_ref_del
  *
- * For non-remote transaction, it will add an OUT_REF_ADD sub-request
- * into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object on which to increase the reference count.
  * \param[in] th       the transaction handle.
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 static int osp_md_declare_ref_add(const struct lu_env *env,
                                  struct dt_object *dt, struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_ref_add(env, dt, th);
-
-               CDEBUG(D_INFO, "declare ref add "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_object_operations::do_ref_add
  *
- * For remote transaction, it will add an OUT_REF_ADD sub-request into
- * the OUT RPC that will be flushed when the transaction stop.
+ * Add an OUT_REF_ADD sub-request into the OUT RPC that will be flushed
+ * when the transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object on which to increase the reference count
  * \param[in] th       the transaction handle
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing ref_add succeeds.
+ * \retval             negative errno if packing fails.
  */
 static int osp_md_ref_add(const struct lu_env *env, struct dt_object *dt,
                          struct thandle *th)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_ref_add(env, dt, th);
+       struct dt_update_request        *update;
+       int                             rc;
 
-               CDEBUG(D_INFO, "ref add "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
+       rc = out_ref_add_pack(env, &update->dur_buf,
+                             lu_object_fid(&dt->do_lu),
+                             update->dur_batchid);
        return rc;
 }
 
@@ -412,61 +280,23 @@ static void osp_md_ah_init(const struct lu_env *env,
 }
 
 /**
- * Add OUT_ATTR_SET sub-request into the OUT RPC.
- *
- * \param[in] env      execution environment
- * \param[in] dt       object on which to set attributes
- * \param[in] attr     attributes to be set
- * \param[in] th       the transaction handle
- *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
- */
-int __osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
-                     const struct lu_attr *attr, struct thandle *th)
-{
-       struct dt_update_request        *update;
-       int                             rc;
-
-       update = thandle_to_dt_update_request(th);
-       LASSERT(update != NULL);
-
-       rc = out_attr_set_pack(env, &update->dur_buf,
-                              lu_object_fid(&dt->do_lu), attr,
-                              update->dur_batchid);
-
-       return rc;
-}
-
-/**
  * Implementation of dt_object_operations::do_declare_attr_get
  *
- * Declare setting attributes to the specified remote object.
- *
- * If the transaction is a non-remote transaction, then add the OUT_ATTR_SET
- * sub-request into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object on which to set attributes
  * \param[in] attr     attributes to be set
  * \param[in] th       the transaction handle
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                            const struct lu_attr *attr, struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_attr_set(env, dt, attr, th);
-
-               CDEBUG(D_INFO, "declare attr set md_object "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
@@ -474,27 +304,29 @@ int osp_md_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
  *
  * Set attributes to the specified remote object.
  *
- * If the transaction is a remote transaction, then add the OUT_ATTR_SET
- * sub-request into the OUT RPC that will be flushed when the transaction stop.
+ * Add the OUT_ATTR_SET sub-request into the OUT RPC that will be flushed
+ * when the transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to set attributes
  * \param[in] attr     attributes to be set
  * \param[in] th       the transaction handle
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing attr_set succeeds.
+ * \retval             negative errno if packing fails.
  */
 int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
                    const struct lu_attr *attr, struct thandle *th)
 {
-       int rc = 0;
+       struct dt_update_request        *update;
+       int                             rc;
 
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_attr_set(env, dt, attr, th);
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
-               CDEBUG(D_INFO, "attr set md_object "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
+       rc = out_attr_set_pack(env, &update->dur_buf,
+                              lu_object_fid(&dt->do_lu), attr,
+                              update->dur_batchid);
 
        return rc;
 }
@@ -635,7 +467,7 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
-       rc = osp_remote_sync(env, osp, update, &req, false);
+       rc = osp_remote_sync(env, osp, update, &req);
        if (rc < 0)
                GOTO(out, rc);
 
@@ -685,48 +517,10 @@ out:
 }
 
 /**
- * Add OUT_INDEX_INSERT sub-request into the OUT RPC.
- *
- * \param[in] env      execution environment
- * \param[in] dt       object for which to insert index
- * \param[in] rec      record of the index which will be inserted
- * \param[in] key      key of the index which will be inserted
- * \param[in] th       the transaction handle
- *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
- */
-static int __osp_md_index_insert(const struct lu_env *env,
-                                struct dt_object *dt,
-                                const struct dt_rec *rec,
-                                const struct dt_key *key,
-                                struct thandle *th)
-{
-       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
-       struct dt_update_request *update = oth->ot_dur;
-       int                      rc;
-
-
-       rc = out_index_insert_pack(env, &update->dur_buf,
-                                  lu_object_fid(&dt->do_lu), rec, key,
-                                  update->dur_batchid);
-       if (rc != 0)
-               return rc;
-
-       /* Before async update is allowed, if it will insert remote
-        * name entry, it should make sure the local object is created,
-        * i.e. the remote update RPC should be sent after local
-        * update(create object) */
-       oth->ot_send_updates_after_local_trans = true;
-
-       return rc;
-}
-
-/**
  * Implementation of dt_index_operations::dio_declare_insert
  *
- * For non-remote transaction, it will add an OUT_INDEX_INSERT sub-request
- * into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object for which to insert index
@@ -734,8 +528,8 @@ static int __osp_md_index_insert(const struct lu_env *env,
  * \param[in] key      key of the index which will be inserted
  * \param[in] th       the transaction handle
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 static int osp_md_declare_index_insert(const struct lu_env *env,
                                       struct dt_object *dt,
@@ -743,25 +537,14 @@ static int osp_md_declare_index_insert(const struct lu_env *env,
                                       const struct dt_key *key,
                                       struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_index_insert(env, dt, rec, key, th);
-
-               CDEBUG(D_INFO, "declare index insert "DFID" key %s, rec "DFID
-                      ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
-                      (char *)key,
-                      PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_index_operations::dio_insert
  *
- * For remote transaction, it will add an OUT_INDEX_INSERT sub-request
- * into the OUT RPC that will be flushed when the transaction stop.
+ * Add an OUT_INDEX_INSERT sub-request into the OUT RPC that will
+ * be flushed when the transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object for which to insert index
@@ -770,7 +553,8 @@ static int osp_md_declare_index_insert(const struct lu_env *env,
  * \param[in] th       the transaction handle
  * \param[in] ignore_quota quota enforcement for insert
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing index insert succeeds.
+ * \retval             negative errno if packing fails.
  */
 static int osp_md_index_insert(const struct lu_env *env,
                               struct dt_object *dt,
@@ -779,106 +563,68 @@ static int osp_md_index_insert(const struct lu_env *env,
                               struct thandle *th,
                               int ignore_quota)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_index_insert(env, dt, rec, key, th);
-
-               CDEBUG(D_INFO, "index insert "DFID" key %s, rec "DFID
-                      ": rc = %d\n", PFID(&dt->do_lu.lo_header->loh_fid),
-                      (char *)key,
-                      PFID(((struct dt_insert_rec *)rec)->rec_fid), rc);
-       }
-
-       return rc;
-}
-
-/**
- * Add OUT_INDEX_DELETE sub-request into the OUT RPC.
- *
- * \param[in] env      execution environment
- * \param[in] dt       object for which to delete index
- * \param[in] key      key of the index
- * \param[in] th       the transaction handle
- *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
- */
-static int __osp_md_index_delete(const struct lu_env *env,
-                                struct dt_object *dt,
-                                const struct dt_key *key,
-                                struct thandle *th)
-{
-       struct dt_update_request *update;
+       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request *update = oth->ot_dur;
        int                      rc;
 
-       update = thandle_to_dt_update_request(th);
-       LASSERT(update != NULL);
 
-       rc = out_index_delete_pack(env, &update->dur_buf,
-                                  lu_object_fid(&dt->do_lu), key,
+       rc = out_index_insert_pack(env, &update->dur_buf,
+                                  lu_object_fid(&dt->do_lu), rec, key,
                                   update->dur_batchid);
+
        return rc;
 }
 
 /**
  * Implementation of dt_index_operations::dio_declare_delete
  *
- * For non-remote transaction, it will add an OUT_INDEX_DELETE sub-request
- * into the OUT RPC that will be flushed when the transaction start.
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object for which to delete index
  * \param[in] key      key of the index
  * \param[in] th       the transaction handle
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 static int osp_md_declare_index_delete(const struct lu_env *env,
                                       struct dt_object *dt,
                                       const struct dt_key *key,
                                       struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_md_index_delete(env, dt, key, th);
-
-               CDEBUG(D_INFO, "declare index delete "DFID" %s: rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_index_operations::dio_delete
  *
- * For remote transaction, it will add an OUT_INDEX_DELETE sub-request
- * into the OUT RPC that will be flushed when the transaction stop.
+ * Add an OUT_INDEX_DELETE sub-request into the OUT RPC that will
+ * be flushed when the transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object for which to delete index
  * \param[in] key      key of the index which will be deleted
  * \param[in] th       the transaction handle
  *
- * \retval             only return 0 for now
+ * \retval             0 if packing index delete succeeds.
+ * \retval             negative errno if packing fails.
  */
 static int osp_md_index_delete(const struct lu_env *env,
                               struct dt_object *dt,
                               const struct dt_key *key,
                               struct thandle *th)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_md_index_delete(env, dt, key, th);
+       struct dt_update_request *update;
+       int                      rc;
 
-               CDEBUG(D_INFO, "index delete "DFID" %s: rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), (char *)key, rc);
-       }
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
+       rc = out_index_delete_pack(env, &update->dur_buf,
+                                  lu_object_fid(&dt->do_lu), key,
+                                  update->dur_batchid);
        return rc;
 }
 
@@ -1153,6 +899,66 @@ static int osp_md_object_unlock(const struct lu_env *env,
        return 0;
 }
 
+/**
+ * Implement OSP layer dt_object_operations::do_declare_destroy() interface.
+ *
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] dt       pointer to the OSP layer dt_object to be destroyed
+ * \param[in] th       pointer to the transaction handler
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int osp_md_declare_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt, struct thandle *th)
+{
+       return osp_trans_update_request_create(th);
+}
+
+/**
+ * Implement OSP layer dt_object_operations::do_destroy() interface.
+ *
+ * Pack the destroy update into the RPC buffer, which will be sent
+ * to the remote MDT during transaction stop.
+ *
+ * It also marks the object as non-cached.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] dt       pointer to the OSP layer dt_object to be destroyed
+ * \param[in] th       pointer to the transaction handler
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+int osp_md_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                         struct thandle *th)
+{
+       struct osp_object               *o = dt2osp_obj(dt);
+       struct osp_device               *osp = lu2osp_dev(dt->do_lu.lo_dev);
+       struct dt_update_request        *update;
+       int                             rc = 0;
+
+       ENTRY;
+       o->opo_non_exist = 1;
+
+       LASSERT(osp->opd_connect_mdt);
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
+
+       rc = out_object_destroy_pack(env, &update->dur_buf,
+                      lu_object_fid(&dt->do_lu), update->dur_batchid);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* not needed in cache any more */
+       set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
+
+       RETURN(rc);
+}
+
 struct dt_object_operations osp_md_obj_ops = {
        .do_read_lock         = osp_md_object_read_lock,
        .do_write_lock        = osp_md_object_write_lock,
@@ -1165,8 +971,8 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_ref_add           = osp_md_ref_add,
        .do_declare_ref_del   = osp_md_declare_ref_del,
        .do_ref_del           = osp_md_ref_del,
-       .do_declare_destroy   = osp_declare_object_destroy,
-       .do_destroy           = osp_object_destroy,
+       .do_declare_destroy   = osp_md_declare_object_destroy,
+       .do_destroy           = osp_md_object_destroy,
        .do_ah_init           = osp_md_ah_init,
        .do_attr_get          = osp_attr_get,
        .do_declare_attr_set  = osp_md_declare_attr_set,
@@ -1184,42 +990,31 @@ struct dt_object_operations osp_md_obj_ops = {
 /**
  * Implementation of dt_body_operations::dbo_declare_write
  *
- * Declare an object write. In DNE phase I, it will pack the write
- * object update into the RPC.
- *
+ * Create the dt_update_request to track the update for this OSP
+ * in the transaction.
 *
  * \param[in] env      execution environment
  * \param[in] dt       object to be written
  * \param[in] buf      buffer to write which includes an embedded size field
  * \param[in] pos      offet in the object to start writing at
  * \param[in] th       transaction handle
  *
- * \retval             0 if the insertion succeeds.
- * \retval             negative errno if the insertion fails.
+ * \retval             0 if preparation succeeds.
+ * \retval             negative errno if preparation fails.
  */
 static ssize_t osp_md_declare_write(const struct lu_env *env,
                                    struct dt_object *dt,
                                    const struct lu_buf *buf,
                                    loff_t pos, struct thandle *th)
 {
-       struct dt_update_request  *update;
-       ssize_t                   rc;
-
-       update = thandle_to_dt_update_request(th);
-       LASSERT(update != NULL);
-
-       rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
-                           buf, pos, update->dur_batchid);
-
-       return rc;
-
+       return osp_trans_update_request_create(th);
 }
 
 /**
  * Implementation of dt_body_operations::dbo_write
  *
- * Return the buffer size. In DNE phase I, remote updates
- * are actually executed during transaction start, the buffer has
- * already been written when this method is being called.
+ * Pack the write object update into the RPC buffer, which will be sent
+ * to the remote MDT during transaction stop.
  *
  * \param[in] env      execution environment
  * \param[in] dt       object to be written
@@ -1228,12 +1023,25 @@ static ssize_t osp_md_declare_write(const struct lu_env *env,
  * \param[in] th       transaction handle
  * \param[in] ignore_quota quota enforcement for this write
  *
- * \retval             the buffer size in bytes.
+ * \retval             the buffer size in bytes if packing succeeds.
+ * \retval             negative errno if packing fails.
  */
 static ssize_t osp_md_write(const struct lu_env *env, struct dt_object *dt,
                            const struct lu_buf *buf, loff_t *pos,
-                           struct thandle *handle, int ignore_quota)
+                           struct thandle *th, int ignore_quota)
 {
+       struct dt_update_request  *update;
+       ssize_t                   rc;
+
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
+
+       rc = out_write_pack(env, &update->dur_buf, lu_object_fid(&dt->do_lu),
+                           buf, *pos, update->dur_batchid);
+       if (rc < 0)
+               return rc;
+
+       /* XXX: how about the write error happened later? */
        *pos += buf->lb_len;
        return buf->lb_len;
 }
index c984151..2898da6 100644 (file)
@@ -575,7 +575,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
-       rc = osp_remote_sync(env, osp, update, &req, false);
+       rc = osp_remote_sync(env, osp, update, &req);
        if (rc != 0) {
                if (rc == -ENOENT) {
                        osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
@@ -612,15 +612,29 @@ out:
        return rc;
 }
 
-static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
-                         const struct lu_attr *attr, struct thandle *th)
+/**
+ * Implement OSP layer dt_object_operations::do_declare_attr_set() interface.
+ *
+ * If the transaction is not remote one, then declare the credits that will
+ * be used for the subsequent llog record for the object's attributes.
+ *
+ * \param[in] env      pointer to the thread context
+ * \param[in] dt       pointer to the OSP layer dt_object
+ * \param[in] attr     pointer to the attribute to be set
+ * \param[in] th       pointer to the transaction handler
+ *
+ * \retval             0 for success
+ * \retval             negative error number on failure
+ */
+static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
+                               const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
-       struct lu_attr          *la;
-       int                      rc = 0;
-       ENTRY;
+       int                     rc;
 
+       if (is_only_remote_trans(th))
+               return osp_md_declare_attr_set(env, dt, attr, th);
        /*
         * Usually we don't allow server stack to manipulate size
         * but there is a special case when striping is created
@@ -649,71 +663,17 @@ static int __osp_attr_set(const struct lu_env *env, struct dt_object *dt,
                LASSERT(!dt_object_exists(dt));
                osp_object_assign_fid(env, d, o);
                rc = osp_object_truncate(env, dt, attr->la_size);
-               if (rc)
+               if (rc != 0)
                        RETURN(rc);
        }
 
        if (!(attr->la_valid & (LA_UID | LA_GID)))
                RETURN(0);
 
-       if (!is_only_remote_trans(th)) {
-               /*
-                * track all UID/GID changes via llog
-                */
-               rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
-       } else {
-               /* It is for OST-object attr_set directly without updating
-                * local MDT-object attribute. It is usually used by LFSCK. */
-               rc = __osp_md_attr_set(env, dt, attr, th);
-       }
-
-       if (rc != 0 || o->opo_ooa == NULL)
-               RETURN(rc);
-
-       /* Update the OSP object attributes cache. */
-       la = &o->opo_ooa->ooa_attr;
-       spin_lock(&o->opo_lock);
-       if (attr->la_valid & LA_UID) {
-               la->la_uid = attr->la_uid;
-               la->la_valid |= LA_UID;
-       }
-
-       if (attr->la_valid & LA_GID) {
-               la->la_gid = attr->la_gid;
-               la->la_valid |= LA_GID;
-       }
-       spin_unlock(&o->opo_lock);
-
-       RETURN(0);
-}
-
-/**
- * Implement OSP layer dt_object_operations::do_declare_attr_set() interface.
- *
- * If the transaction is not remote one, then declare the credits that will
- * be used for the subsequent llog record for the object's attributes.
- *
- * \param[in] env      pointer to the thread context
- * \param[in] dt       pointer to the OSP layer dt_object
- * \param[in] attr     pointer to the attribute to be set
- * \param[in] th       pointer to the transaction handler
- *
- * \retval             0 for success
- * \retval             negative error number on failure
- */
-static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
-                               const struct lu_attr *attr, struct thandle *th)
-{
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_attr_set(env, dt, attr, th);
-
-               CDEBUG(D_INFO, "declare set attr "DFID": rc = %d\n",
-                      PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
+       /* track all UID/GID changes via llog */
+       rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
 
-       return rc;
+       return 0;
 }
 
 /**
@@ -744,24 +704,39 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
        int                      rc = 0;
        ENTRY;
 
-       if (is_only_remote_trans(th)) {
-               rc = __osp_attr_set(env, dt, attr, th);
+       /* we're interested in uid/gid changes only */
+       if (!(attr->la_valid & (LA_UID | LA_GID)))
+               RETURN(0);
+
+       if (!is_only_remote_trans(th)) {
+               rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
+               /* XXX: send new uid/gid to OST ASAP? */
+       } else {
+               struct lu_attr  *la;
 
+               /* It is for OST-object attr_set directly without updating
+                * local MDT-object attribute. It is usually used by LFSCK. */
+               rc = osp_md_attr_set(env, dt, attr, th);
                CDEBUG(D_INFO, "(1) set attr "DFID": rc = %d\n",
                       PFID(&dt->do_lu.lo_header->loh_fid), rc);
 
-               RETURN(rc);
-       }
-
-       /* we're interested in uid/gid changes only */
-       if (!(attr->la_valid & (LA_UID | LA_GID)))
-               RETURN(0);
+               if (rc != 0 || o->opo_ooa == NULL)
+                       RETURN(rc);
 
-       rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr);
-       /* XXX: send new uid/gid to OST ASAP? */
+               /* Update the OSP object attributes cache. */
+               la = &o->opo_ooa->ooa_attr;
+               spin_lock(&o->opo_lock);
+               if (attr->la_valid & LA_UID) {
+                       la->la_uid = attr->la_uid;
+                       la->la_valid |= LA_UID;
+               }
 
-       CDEBUG(D_INFO, "(2) set attr "DFID": rc = %d\n",
-              PFID(&dt->do_lu.lo_header->loh_fid), rc);
+               if (attr->la_valid & LA_GID) {
+                       la->la_gid = attr->la_gid;
+                       la->la_valid |= LA_GID;
+               }
+               spin_unlock(&o->opo_lock);
+       }
 
        RETURN(rc);
 }
@@ -996,7 +971,7 @@ unlock:
                GOTO(out, rc);
        }
 
-       rc = osp_remote_sync(env, osp, update, &req, false);
+       rc = osp_remote_sync(env, osp, update, &req);
        if (rc != 0) {
                if (rc == -ENOENT) {
                        dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
@@ -1110,68 +1085,6 @@ out:
        return rc;
 }
 
-static int __osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                          const struct lu_buf *buf, const char *name,
-                          int flag, struct thandle *th)
-{
-       struct osp_object       *o = dt2osp_obj(dt);
-       struct dt_update_request *update;
-       struct osp_xattr_entry  *oxe;
-       int                     rc;
-       ENTRY;
-
-       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
-       update = thandle_to_dt_update_request(th);
-       LASSERT(update != NULL);
-
-       rc = out_xattr_set_pack(env, &update->dur_buf,
-                               lu_object_fid(&dt->do_lu),
-                               buf, name, flag, update->dur_batchid);
-       if (rc != 0 || o->opo_ooa == NULL)
-               RETURN(rc);
-
-       oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
-       if (oxe == NULL) {
-               CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
-                     dt->do_lu.lo_dev->ld_obd->obd_name,
-                     name, PFID(lu_object_fid(&dt->do_lu)));
-
-               RETURN(0);
-       }
-
-       if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
-               struct osp_xattr_entry *old = oxe;
-               struct osp_xattr_entry *tmp;
-
-               tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
-               osp_oac_xattr_put(oxe);
-               oxe = tmp;
-               if (tmp == NULL) {
-                       CWARN("%s: cannot update cached xattr '%s' of "DFID"\n",
-                             dt->do_lu.lo_dev->ld_obd->obd_name,
-                             name, PFID(lu_object_fid(&dt->do_lu)));
-                       spin_lock(&o->opo_lock);
-                       old->oxe_ready = 0;
-                       spin_unlock(&o->opo_lock);
-
-                       RETURN(0);
-               }
-
-               /* Drop the ref for entry on list. */
-               osp_oac_xattr_put(old);
-       }
-
-       spin_lock(&o->opo_lock);
-       oxe->oxe_vallen = buf->lb_len;
-       memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
-       oxe->oxe_exist = 1;
-       oxe->oxe_ready = 1;
-       spin_unlock(&o->opo_lock);
-       osp_oac_xattr_put(oxe);
-
-       RETURN(0);
-}
-
 /**
  * Implement OSP layer dt_object_operations::do_declare_xattr_set() interface.
  *
@@ -1199,16 +1112,7 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
                          const struct lu_buf *buf, const char *name,
                          int flag, struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_xattr_set(env, dt, buf, name, flag, th);
-
-               CDEBUG(D_INFO, "declare xattr %s set object "DFID": rc = %d\n",
-                      name, PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
@@ -1216,11 +1120,10 @@ int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
  *
  * Set extended attribute to the specified MDT/OST object.
  *
- * If it is remote transaction, it will add an OUT_XATTR_SET sub-request into
- * the OUT RPC that will be flushed when the transaction stop. And if the OSP
- * attributes cache is initialized, then check whether the name extended
- * attribute entry exists in the cache or not. If yes, replace it; otherwise,
- * add the extended attribute to the cache.
+ * Add an OUT_XATTR_SET sub-request into the OUT RPC that will be flushed in
+ * the transaction stop. And if the OSP attributes cache is initialized, then
+ * check whether the name extended attribute entry exists in the cache or not.
+ * If yes, replace it; otherwise, add the extended attribute to the cache.
  *
  * \param[in] env      pointer to the thread context
  * \param[in] dt       pointer to the OSP layer dt_object
@@ -1237,44 +1140,65 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
                  const struct lu_buf *buf, const char *name, int fl,
                  struct thandle *th)
 {
-       int rc = 0;
-
-       if (is_only_remote_trans(th)) {
-               rc = __osp_xattr_set(env, dt, buf, name, fl, th);
-
-               CDEBUG(D_INFO, "xattr %s set object "DFID": rc = %d\n",
-                      name, PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
-}
-
-static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
-                          const char *name, struct thandle *th)
-{
+       struct osp_object       *o = dt2osp_obj(dt);
        struct dt_update_request *update;
-       const struct lu_fid      *fid;
-       struct osp_object        *o     = dt2osp_obj(dt);
-       struct osp_xattr_entry   *oxe;
-       int                       rc;
+       struct osp_xattr_entry  *oxe;
+       int                     rc;
+       ENTRY;
 
+       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
        update = thandle_to_dt_update_request(th);
        LASSERT(update != NULL);
 
-       fid = lu_object_fid(&dt->do_lu);
-
-       rc = out_xattr_del_pack(env, &update->dur_buf, fid, name,
-                               update->dur_batchid);
+       CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n",
+              PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len);
 
+       rc = out_xattr_set_pack(env, &update->dur_buf,
+                               lu_object_fid(&dt->do_lu),
+                               buf, name, fl, update->dur_batchid);
        if (rc != 0 || o->opo_ooa == NULL)
                return rc;
 
-       oxe = osp_oac_xattr_find(o, name, true);
-       if (oxe != NULL)
-               /* Drop the ref for entry on list. */
+       oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
+       if (oxe == NULL) {
+               CWARN("%s: cannot cache xattr '%s' of "DFID"\n",
+                     dt->do_lu.lo_dev->ld_obd->obd_name,
+                     name, PFID(lu_object_fid(&dt->do_lu)));
+
+               RETURN(0);
+       }
+
+       if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
+               struct osp_xattr_entry *old = oxe;
+               struct osp_xattr_entry *tmp;
+
+               tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
                osp_oac_xattr_put(oxe);
+               oxe = tmp;
+               if (tmp == NULL) {
+                       CWARN("%s: cannot update cached xattr '%s' of "DFID"\n",
+                             dt->do_lu.lo_dev->ld_obd->obd_name,
+                             name, PFID(lu_object_fid(&dt->do_lu)));
+                       spin_lock(&o->opo_lock);
+                       old->oxe_ready = 0;
+                       spin_unlock(&o->opo_lock);
 
-       return 0;
+                       RETURN(0);
+               }
+
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(old);
+       }
+
+       spin_lock(&o->opo_lock);
+       oxe->oxe_vallen = buf->lb_len;
+       memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
+       oxe->oxe_exist = 1;
+       oxe->oxe_ready = 1;
+       spin_unlock(&o->opo_lock);
+       osp_oac_xattr_put(oxe);
+
+       RETURN(0);
 }
 
 /**
@@ -1299,16 +1223,7 @@ static int __osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
 int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
                          const char *name, struct thandle *th)
 {
-       int rc = 0;
-
-       if (!is_only_remote_trans(th)) {
-               rc = __osp_xattr_del(env, dt, name, th);
-
-               CDEBUG(D_INFO, "declare xattr %s del object "DFID": rc = %d\n",
-                      name, PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
-
-       return rc;
+       return osp_trans_update_request_create(th);
 }
 
 /**
@@ -1332,16 +1247,26 @@ int osp_declare_xattr_del(const struct lu_env *env, struct dt_object *dt,
 int osp_xattr_del(const struct lu_env *env, struct dt_object *dt,
                  const char *name, struct thandle *th)
 {
-       int rc = 0;
+       struct dt_update_request *update;
+       const struct lu_fid      *fid = lu_object_fid(&dt->do_lu);
+       struct osp_object        *o = dt2osp_obj(dt);
+       struct osp_xattr_entry   *oxe;
+       int                       rc;
 
-       if (is_only_remote_trans(th)) {
-               rc = __osp_xattr_del(env, dt, name, th);
+       update = thandle_to_dt_update_request(th);
+       LASSERT(update != NULL);
 
-               CDEBUG(D_INFO, "xattr %s del object "DFID": rc = %d\n",
-                      name, PFID(&dt->do_lu.lo_header->loh_fid), rc);
-       }
+       rc = out_xattr_del_pack(env, &update->dur_buf, fid, name,
+                               update->dur_batchid);
+       if (rc != 0 || o->opo_ooa == NULL)
+               return rc;
 
-       return rc;
+       oxe = osp_oac_xattr_find(o, name, true);
+       if (oxe != NULL)
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(oxe);
+
+       return 0;
 }
 
 /**
@@ -1593,13 +1518,12 @@ int osp_declare_object_destroy(const struct lu_env *env,
                               struct dt_object *dt, struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
+       struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
        int                      rc = 0;
 
        ENTRY;
 
-       /*
-        * track objects to be destroyed via llog
-        */
+       LASSERT(!osp->opd_connect_mdt);
        rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
 
        RETURN(rc);
@@ -1622,20 +1546,22 @@ int osp_declare_object_destroy(const struct lu_env *env,
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
-                      struct thandle *th)
+static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                             struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
+       struct osp_device       *osp = lu2osp_dev(dt->do_lu.lo_dev);
        int                      rc = 0;
 
        ENTRY;
-
        o->opo_non_exist = 1;
-       /*
-        * once transaction is committed put proper command on
-        * the queue going to our OST
-        */
+
+       LASSERT(!osp->opd_connect_mdt);
+       /* once transaction is committed put proper command on
+        * the queue going to our OST. */
        rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
+       if (rc < 0)
+               RETURN(rc);
 
        /* not needed in cache any more */
        set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
index 17d90ed..2f9b78a 100644 (file)
@@ -720,82 +720,6 @@ static int osp_sync_new_unlink_job(struct osp_device *d,
 }
 
 /**
- * Prepare OUT-based object destroy RPC.
- *
- * The function allocates a new RPC with OUT format. Then initializes the RPC
- * to contain OUT_DESTROY update against the object specified in the llog
- * record provided by the caller.
- *
- * \param[in] env      LU environment provided by the caller
- * \param[in] osp      OSP device
- * \param[in] llh      llog handle where the record is stored
- * \param[in] h                llog record
- * \param[out] reqp    request prepared
- *
- * \retval 0           on success
- * \retval negative    negated errno on error
- */
-static int osp_prep_unlink_update_req(const struct lu_env *env,
-                                     struct osp_device *osp,
-                                     struct llog_handle *llh,
-                                     struct llog_rec_hdr *h,
-                                     struct ptlrpc_request **reqp)
-{
-       struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
-       struct dt_update_request        *update = NULL;
-       struct ptlrpc_request           *req;
-       struct llog_cookie              lcookie;
-       const void                      *buf;
-       __u16                           size;
-       int                             rc;
-       ENTRY;
-
-       update = dt_update_request_create(&osp->opd_dt_dev);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
-
-       /* This can only happens for unlink slave directory, so decrease
-        * ref for ".." and "." */
-       rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid,
-                            0, NULL, NULL, 0);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = out_update_pack(env, &update->dur_buf, OUT_REF_DEL, &rec->lur_fid,
-                            0, NULL, NULL, 0);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       lcookie.lgc_lgl = llh->lgh_id;
-       lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
-       lcookie.lgc_index = h->lrh_index;
-       size = sizeof(lcookie);
-       buf = &lcookie;
-
-       rc = out_update_pack(env, &update->dur_buf, OUT_DESTROY, &rec->lur_fid,
-                            1, &size, &buf, 0);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                update->dur_buf.ub_req, &req);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       req->rq_interpret_reply = osp_sync_interpret;
-       req->rq_commit_cb = osp_sync_request_commit_cb;
-       req->rq_cb_data = osp;
-
-       ptlrpc_request_set_replen(req);
-       *reqp = req;
-out:
-       if (update != NULL)
-               dt_update_request_destroy(update);
-
-       RETURN(rc);
-}
-
-/**
  * Generate a request for unlink change.
  *
  * The function prepares a new RPC, initializes it with unlink(destroy)
@@ -826,27 +750,20 @@ static int osp_sync_new_unlink64_job(const struct lu_env *env,
 
        ENTRY;
        LASSERT(h->lrh_type == MDS_UNLINK64_REC);
+       req = osp_sync_new_job(d, llh, h, OST_DESTROY,
+                              &RQF_OST_DESTROY);
+       if (IS_ERR(req))
+               RETURN(PTR_ERR(req));
 
-       if (d->opd_connect_mdt) {
-               rc = osp_prep_unlink_update_req(env, d, llh, h, &req);
-               if (rc != 0)
-                       RETURN(rc);
-       } else {
-               req = osp_sync_new_job(d, llh, h, OST_DESTROY,
-                                      &RQF_OST_DESTROY);
-               if (IS_ERR(req))
-                       RETURN(PTR_ERR(req));
-
-               body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
-               if (body == NULL)
-                       RETURN(-EFAULT);
-               rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
-               if (rc < 0)
-                       RETURN(rc);
-               body->oa.o_misc = rec->lur_count;
-               body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
-                                  OBD_MD_FLOBJCOUNT;
-       }
+       body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+       if (body == NULL)
+               RETURN(-EFAULT);
+       rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
+       if (rc < 0)
+               RETURN(rc);
+       body->oa.o_misc = rec->lur_count;
+       body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
+                          OBD_MD_FLOBJCOUNT;
        osp_sync_send_new_rpc(d, req);
        RETURN(1);
 }
@@ -1028,27 +945,10 @@ static void osp_sync_process_committed(const struct lu_env *env,
 
                req = container_of((void *)jra, struct ptlrpc_request,
                                   rq_async_args);
-               if (d->opd_connect_mdt) {
-                       struct object_update_request *ureq;
-                       struct object_update *update;
-                       ureq = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_OUT_UPDATE);
-                       LASSERT(ureq != NULL &&
-                               ureq->ourq_magic == UPDATE_REQUEST_MAGIC);
-
-                       /* 1st/2nd is for decref . and .., 3rd one is for
-                        * destroy, where the log cookie is stored.
-                        * See osp_prep_unlink_update_req */
-                       update = object_update_request_get(ureq, 2, NULL);
-                       LASSERT(update != NULL);
-                       lcookie = object_update_param_get(update, 0, NULL);
-                       LASSERT(lcookie != NULL);
-               } else {
-                       body = req_capsule_client_get(&req->rq_pill,
-                                                     &RMF_OST_BODY);
-                       LASSERT(body);
-                       lcookie = &body->oa.o_lcookie;
-               }
+               body = req_capsule_client_get(&req->rq_pill,
+                                             &RMF_OST_BODY);
+               LASSERT(body);
+               lcookie = &body->oa.o_lcookie;
                /* import can be closing, thus all commit cb's are
                 * called we can check committness directly */
                if (req->rq_transno <= imp->imp_peer_committed_transno) {
@@ -1308,10 +1208,7 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
        OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
        obd->obd_lvfs_ctxt.dt = d->opd_storage;
 
-       if (d->opd_connect_mdt)
-               lu_local_obj_fid(fid, SLAVE_LLOG_CATALOGS_OID);
-       else
-               lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
+       lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
 
        rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
                                   &osi->osi_cid, fid);
index 3daed73..740270d 100644 (file)
  * will be called one by one to handle each own result.
  *
  *
+ * There are three kinds of transactions
+ *
+ * 1. Local transaction, all of updates of the transaction are in the local MDT.
+ * 2. Remote transaction, all of updates of the transaction are in one remote
+ * MDT, which only happens in LFSCK now.
+ * 3. Distribute transaction, updates for the transaction are in mulitple MDTs.
+ *
  * Author: Di Wang <di.wang@intel.com>
  * Author: Fan, Yong <fan.yong@intel.com>
  */
 
 #include "osp_internal.h"
 
-struct osp_async_update_args {
+/**
+ * The argument for the interpreter callback of osp request.
+ */
+struct osp_update_args {
        struct dt_update_request *oaua_update;
        atomic_t                 *oaua_count;
        wait_queue_head_t        *oaua_waitq;
        bool                      oaua_flow_control;
 };
 
-struct osp_async_request {
+/**
+ * Call back for each update request.
+ */
+struct osp_update_callback {
        /* list in the dt_update_request::dur_cb_items */
-       struct list_head                 oar_list;
+       struct list_head                 ouc_list;
 
        /* The target of the async update request. */
-       struct osp_object               *oar_obj;
+       struct osp_object               *ouc_obj;
 
-       /* The data used by oar_interpreter. */
-       void                            *oar_data;
+       /* The data used by or_interpreter. */
+       void                            *ouc_data;
 
        /* The interpreter function called after the async request handled. */
-       osp_async_request_interpreter_t  oar_interpreter;
+       osp_update_interpreter_t        ouc_interpreter;
 };
 
+static struct object_update_request *object_update_request_alloc(size_t size)
+{
+       struct object_update_request *ourq;
+
+       OBD_ALLOC_LARGE(ourq, size);
+       if (ourq == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
+       ourq->ourq_count = 0;
+
+       return ourq;
+}
+
+static void object_update_request_free(struct object_update_request *ourq,
+                                      size_t ourq_size)
+{
+       if (ourq != NULL)
+               OBD_FREE_LARGE(ourq, ourq_size);
+}
+
 /**
- * Allocate an asynchronous request and initialize it with the given parameters.
+ * Allocate and initialize dt_update_request
+ *
+ * dt_update_request is being used to track updates being executed on
+ * this dt_device(OSD or OSP). The update buffer will be 4k initially,
+ * and increased if needed.
+ *
+ * \param [in] dt      dt device
+ *
+ * \retval             dt_update_request being allocated if succeed
+ * \retval             ERR_PTR(errno) if failed
+ */
+struct dt_update_request *dt_update_request_create(struct dt_device *dt)
+{
+       struct dt_update_request *dt_update;
+       struct object_update_request *ourq;
+
+       OBD_ALLOC_PTR(dt_update);
+       if (dt_update == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
+       if (IS_ERR(ourq)) {
+               OBD_FREE_PTR(dt_update);
+               return ERR_CAST(ourq);
+       }
+
+       dt_update->dur_buf.ub_req = ourq;
+       dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
+
+       dt_update->dur_dt = dt;
+       dt_update->dur_batchid = 0;
+       INIT_LIST_HEAD(&dt_update->dur_cb_items);
+
+       return dt_update;
+}
+
+/**
+ * Destroy dt_update_request
+ *
+ * \param [in] dt_update       dt_update_request being destroyed
+ */
+void dt_update_request_destroy(struct dt_update_request *dt_update)
+{
+       if (dt_update == NULL)
+               return;
+
+       object_update_request_free(dt_update->dur_buf.ub_req,
+                                  dt_update->dur_buf.ub_req_size);
+       OBD_FREE_PTR(dt_update);
+}
+
+/**
+ * Allocate an osp request and initialize it with the given parameters.
  *
  * \param[in] obj              pointer to the operation target
  * \param[in] data             pointer to the data used by the interpreter
@@ -92,38 +178,38 @@ struct osp_async_request {
  * \retval                     pointer to the asychronous request
  * \retval                     NULL if the allocation failed
  */
-static struct osp_async_request *
-osp_async_request_init(struct osp_object *obj, void *data,
-                      osp_async_request_interpreter_t interpreter)
+static struct osp_update_callback *
+osp_update_callback_init(struct osp_object *obj, void *data,
+                        osp_update_interpreter_t interpreter)
 {
-       struct osp_async_request *oar;
+       struct osp_update_callback *ouc;
 
-       OBD_ALLOC_PTR(oar);
-       if (oar == NULL)
+       OBD_ALLOC_PTR(ouc);
+       if (ouc == NULL)
                return NULL;
 
        lu_object_get(osp2lu_obj(obj));
-       INIT_LIST_HEAD(&oar->oar_list);
-       oar->oar_obj = obj;
-       oar->oar_data = data;
-       oar->oar_interpreter = interpreter;
+       INIT_LIST_HEAD(&ouc->ouc_list);
+       ouc->ouc_obj = obj;
+       ouc->ouc_data = data;
+       ouc->ouc_interpreter = interpreter;
 
-       return oar;
+       return ouc;
 }
 
 /**
- * Destroy the asychronous request.
+ * Destroy the osp_update_callback.
  *
  * \param[in] env      pointer to the thread context
- * \param[in] oar      pointer to asychronous request
+ * \param[in] ouc      pointer to osp_update_callback
  */
-static void osp_async_request_fini(const struct lu_env *env,
-                                  struct osp_async_request *oar)
+static void osp_update_callback_fini(const struct lu_env *env,
+                                    struct osp_update_callback *ouc)
 {
-       LASSERT(list_empty(&oar->oar_list));
+       LASSERT(list_empty(&ouc->ouc_list));
 
-       lu_object_put(env, osp2lu_obj(oar->oar_obj));
-       OBD_FREE_PTR(oar);
+       lu_object_put(env, osp2lu_obj(ouc->ouc_obj));
+       OBD_FREE_PTR(ouc);
 }
 
 /**
@@ -140,15 +226,14 @@ static void osp_async_request_fini(const struct lu_env *env,
  * \retval             0 for success
  * \retval             negative error number on failure
  */
-static int osp_async_update_interpret(const struct lu_env *env,
-                                     struct ptlrpc_request *req,
-                                     void *arg, int rc)
+static int osp_update_interpret(const struct lu_env *env,
+                               struct ptlrpc_request *req, void *arg, int rc)
 {
        struct object_update_reply      *reply  = NULL;
-       struct osp_async_update_args    *oaua   = arg;
+       struct osp_update_args          *oaua   = arg;
        struct dt_update_request        *dt_update = oaua->oaua_update;
-       struct osp_async_request        *oar;
-       struct osp_async_request        *next;
+       struct osp_update_callback      *ouc;
+       struct osp_update_callback      *next;
        int                              count  = 0;
        int                              index  = 0;
        int                              rc1    = 0;
@@ -170,9 +255,9 @@ static int osp_async_update_interpret(const struct lu_env *env,
                rc1 = rc;
        }
 
-       list_for_each_entry_safe(oar, next, &dt_update->dur_cb_items,
-                                oar_list) {
-               list_del_init(&oar->oar_list);
+       list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
+                                ouc_list) {
+               list_del_init(&ouc->ouc_list);
 
                /* The peer may only have handled some requests (indicated
                 * by the 'count') in the packaged OUT RPC, we can only get
@@ -191,9 +276,11 @@ static int osp_async_update_interpret(const struct lu_env *env,
                                rc1 = -EINVAL;
                }
 
-               oar->oar_interpreter(env, reply, req, oar->oar_obj,
-                                      oar->oar_data, index, rc1);
-               osp_async_request_fini(env, oar);
+               if (ouc->ouc_interpreter != NULL)
+                       ouc->ouc_interpreter(env, reply, req, ouc->ouc_obj,
+                                            ouc->ouc_data, index, rc1);
+
+               osp_update_callback_fini(env, ouc);
                index++;
        }
 
@@ -220,22 +307,24 @@ int osp_unplug_async_request(const struct lu_env *env,
                             struct osp_device *osp,
                             struct dt_update_request *update)
 {
-       struct osp_async_update_args    *args;
-       struct ptlrpc_request           *req = NULL;
-       int                              rc;
+       struct osp_update_args  *args;
+       struct ptlrpc_request   *req = NULL;
+       int                      rc;
 
        rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
                                 update->dur_buf.ub_req, &req);
        if (rc != 0) {
-               struct osp_async_request *oar;
-               struct osp_async_request *next;
-
-               list_for_each_entry_safe(oar, next,
-                                        &update->dur_cb_items, oar_list) {
-                       list_del_init(&oar->oar_list);
-                       oar->oar_interpreter(env, NULL, NULL, oar->oar_obj,
-                                              oar->oar_data, 0, rc);
-                       osp_async_request_fini(env, oar);
+               struct osp_update_callback *ouc;
+               struct osp_update_callback *next;
+
+               list_for_each_entry_safe(ouc, next,
+                                        &update->dur_cb_items, ouc_list) {
+                       list_del_init(&ouc->ouc_list);
+                       if (ouc->ouc_interpreter != NULL)
+                               ouc->ouc_interpreter(env, NULL, NULL,
+                                                    ouc->ouc_obj,
+                                                    ouc->ouc_data, 0, rc);
+                       osp_update_callback_fini(env, ouc);
                }
                dt_update_request_destroy(update);
        } else {
@@ -244,7 +333,7 @@ int osp_unplug_async_request(const struct lu_env *env,
                args->oaua_count = NULL;
                args->oaua_waitq = NULL;
                args->oaua_flow_control = false;
-               req->rq_interpret_reply = osp_async_update_interpret;
+               req->rq_interpret_reply = osp_update_interpret;
                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        }
 
@@ -279,6 +368,37 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
 }
 
 /**
+ * Insert an osp_update_callback into the dt_update_request.
+ *
+ * Insert an osp_update_callback to the dt_update_request. Usually each update
+ * in the dt_update_request will have one correspondent callback, and these
+ * callbacks will be called in rq_interpret_reply.
+ *
+ * \param[in] env              pointer to the thread context
+ * \param[in] obj              pointer to the operation target object
+ * \param[in] data             pointer to the data used by the interpreter
+ * \param[in] interpreter      pointer to the interpreter function
+ *
+ * \retval                     0 for success
+ * \retval                     negative error number on failure
+ */
+int osp_insert_update_callback(const struct lu_env *env,
+                              struct dt_update_request *update,
+                              struct osp_object *obj, void *data,
+                              osp_update_interpreter_t interpreter)
+{
+       struct osp_update_callback  *ouc;
+
+       ouc = osp_update_callback_init(obj, data, interpreter);
+       if (ouc == NULL)
+               RETURN(-ENOMEM);
+
+       list_add_tail(&ouc->ouc_list, &update->dur_cb_items);
+
+       return 0;
+}
+
+/**
  * Insert an asynchronous idempotent request to the shared request queue that
  * is attached to the osp_device.
  *
@@ -305,21 +425,16 @@ osp_find_or_create_async_update_request(struct osp_device *osp)
 int osp_insert_async_request(const struct lu_env *env, enum update_type op,
                             struct osp_object *obj, int count,
                             __u16 *lens, const void **bufs, void *data,
-                            osp_async_request_interpreter_t interpreter)
+                            osp_update_interpreter_t interpreter)
 {
-       struct osp_async_request     *oar;
        struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
        struct dt_update_request     *update;
        int                           rc  = 0;
        ENTRY;
 
-       oar = osp_async_request_init(obj, data, interpreter);
-       if (oar == NULL)
-               RETURN(-ENOMEM);
-
        update = osp_find_or_create_async_update_request(osp);
        if (IS_ERR(update))
-               GOTO(out, rc = PTR_ERR(update));
+               RETURN(PTR_ERR(update));
 
 again:
        /* The queue is full. */
@@ -333,27 +448,37 @@ again:
                rc = osp_unplug_async_request(env, osp, update);
                mutex_lock(&osp->opd_async_requests_mutex);
                if (rc != 0)
-                       GOTO(out, rc);
+                       RETURN(rc);
 
                update = osp_find_or_create_async_update_request(osp);
                if (IS_ERR(update))
-                       GOTO(out, rc = PTR_ERR(update));
+                       RETURN(PTR_ERR(update));
 
                goto again;
        }
 
-       if (rc == 0)
-               list_add_tail(&oar->oar_list, &update->dur_cb_items);
+       rc = osp_insert_update_callback(env, update, obj, data, interpreter);
 
-       GOTO(out, rc);
+       RETURN(rc);
+}
 
-out:
-       if (rc != 0)
-               osp_async_request_fini(env, oar);
+int osp_trans_update_request_create(struct thandle *th)
+{
+       struct osp_thandle              *oth = thandle_to_osp_thandle(th);
+       struct dt_update_request        *update;
 
-       return rc;
-}
+       if (oth->ot_dur != NULL)
+               return 0;
 
+       update = dt_update_request_create(th->th_dev);
+       if (IS_ERR(update)) {
+               th->th_result = PTR_ERR(update);
+               return PTR_ERR(update);
+       }
+
+       oth->ot_dur = update;
+       return 0;
+}
 /**
  * The OSP layer dt_device_operations::dt_trans_create() interface
  * to create a transaction.
@@ -385,7 +510,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
 {
        struct osp_thandle              *oth;
        struct thandle                  *th = NULL;
-       struct dt_update_request        *update;
        ENTRY;
 
        OBD_ALLOC_PTR(oth);
@@ -396,15 +520,6 @@ struct thandle *osp_trans_create(const struct lu_env *env, struct dt_device *d)
        th->th_dev = d;
        th->th_tags = LCT_TX_HANDLE;
 
-       update = dt_update_request_create(d);
-       if (IS_ERR(update)) {
-               OBD_FREE_PTR(oth);
-               RETURN(ERR_CAST(update));
-       }
-
-       oth->ot_dur = update;
-       oth->ot_send_updates_after_local_trans = false;
-
        RETURN(th);
 }
 
@@ -475,7 +590,7 @@ int osp_prep_update_req(const struct lu_env *env, struct obd_import *imp,
  */
 int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
                    struct dt_update_request *dt_update,
-                   struct ptlrpc_request **reqp, bool rpc_lock)
+                   struct ptlrpc_request **reqp)
 {
        struct obd_import       *imp = osp->opd_obd->u.cli.cl_import;
        struct ptlrpc_request   *req = NULL;
@@ -488,11 +603,7 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
 
        /* Note: some dt index api might return non-zero result here, like
         * osd_index_ea_lookup, so we should only check rc < 0 here */
-       if (rpc_lock)
-               osp_get_rpc_lock(osp);
        rc = ptlrpc_queue_wait(req);
-       if (rpc_lock)
-               osp_put_rpc_lock(osp);
        if (rc < 0) {
                ptlrpc_req_finished(req);
                dt_update->dur_rc = rc;
@@ -514,9 +625,8 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
 /**
  * Trigger the request for remote updates.
  *
- * If the transaction is not a remote one or it is required to be sync mode
- * (th->th_sync is set), then it will be sent synchronously; otherwise, the
- * RPC will be sent asynchronously.
+ * If th_sync is set, then the request will be sent synchronously,
+ * otherwise, the RPC will be sent asynchronously.
  *
  * Please refer to osp_trans_create() for transaction type.
  *
@@ -524,44 +634,50 @@ int osp_remote_sync(const struct lu_env *env, struct osp_device *osp,
  * \param[in] osp              pointer to the OSP device
  * \param[in] dt_update                pointer to the dt_update_request
  * \param[in] th               pointer to the transaction handler
- * \param[in] flow_control     whether need to control the flow
+ * \param[out] sent            whether the RPC has been sent
  *
  * \retval                     0 for success
  * \retval                     negative error number on failure
  */
 static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
                             struct dt_update_request *dt_update,
-                            struct thandle *th, bool flow_control)
+                            struct thandle *th, int *sent)
 {
+       struct osp_update_args  *args;
+       struct ptlrpc_request   *req;
        int     rc = 0;
+       ENTRY;
 
-       if (is_only_remote_trans(th) && !th->th_sync) {
-               struct osp_async_update_args    *args;
-               struct ptlrpc_request           *req;
-
-               rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
-                                        dt_update->dur_buf.ub_req, &req);
-               if (rc != 0)
-                       return rc;
-               down_read(&osp->opd_async_updates_rwsem);
-
-               args = ptlrpc_req_async_args(req);
-               args->oaua_update = dt_update;
-               args->oaua_count = &osp->opd_async_updates_count;
-               args->oaua_waitq = &osp->opd_syn_barrier_waitq;
-               args->oaua_flow_control = flow_control;
-               req->rq_interpret_reply =
-                       osp_async_update_interpret;
+       rc = osp_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+                                dt_update->dur_buf.ub_req, &req);
+       if (rc != 0)
+               RETURN(rc);
 
-               atomic_inc(args->oaua_count);
-               up_read(&osp->opd_async_updates_rwsem);
+       *sent = 1;
+       req->rq_interpret_reply = osp_update_interpret;
+       args = ptlrpc_req_async_args(req);
+       args->oaua_update = dt_update;
+       if (is_only_remote_trans(th) && !th->th_sync) {
+               args->oaua_flow_control = true;
+
+               if (!osp->opd_connect_mdt) {
+                       down_read(&osp->opd_async_updates_rwsem);
+                       args->oaua_count = &osp->opd_async_updates_count;
+                       args->oaua_waitq = &osp->opd_syn_barrier_waitq;
+                       up_read(&osp->opd_async_updates_rwsem);
+                       atomic_inc(args->oaua_count);
+               }
 
                ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
        } else {
-               rc = osp_remote_sync(env, osp, dt_update, NULL, true);
+               osp_get_rpc_lock(osp);
+               args->oaua_flow_control = false;
+               rc = ptlrpc_queue_wait(req);
+               osp_put_rpc_lock(osp);
+               ptlrpc_req_finished(req);
        }
 
-       return rc;
+       RETURN(rc);
 }
 
 /**
@@ -618,19 +734,7 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env,
  * to start the transaction.
  *
  * If the transaction is a remote transaction, then related remote
- * updates will be triggered in the osp_trans_stop(); otherwise the
- * transaction contains both local and remote update(s), then when
- * the OUT RPC will be triggered depends on the operation, and is
- * indicated by the dt_device::tu_sent_after_local_trans, for example:
- *
- * 1) If it is remote create, it will send the remote req after local
- * transaction. i.e. create the object locally first, then insert the
- * remote name entry.
- *
- * 2) If it is remote unlink, it will send the remote req before the
- * local transaction, i.e. delete the name entry remotely first, then
- * destroy the local object.
- *
+ * updates will be triggered in the osp_trans_stop().
  * Please refer to osp_trans_create() for transaction type.
  *
  * \param[in] env              pointer to the thread context
@@ -643,51 +747,20 @@ struct thandle *osp_get_storage_thandle(const struct lu_env *env,
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th)
 {
-       struct osp_thandle       *oth = thandle_to_osp_thandle(th);
-       struct dt_update_request *dt_update;
-       int                      rc = 0;
+       struct osp_thandle      *oth = thandle_to_osp_thandle(th);
 
-       dt_update = oth->ot_dur;
-       LASSERT(dt_update != NULL);
-
-       /* return if there are no updates,  */
-       if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0)
-               GOTO(out, rc = 0);
-
-       /* Note: some updates needs to send before local transaction,
-        * some needs to send after local transaction.
-        *
-        * If the transaction only includes remote updates, it will
-        * send updates to remote MDT in osp_trans_stop.
-        *
-        * If it is remote create, it will send the remote req after
-        * local transaction. i.e. create the object locally first,
-        * then insert the name entry.
-        *
-        * If it is remote unlink, it will send the remote req before
-        * the local transaction, i.e. delete the name entry remote
-        * first, then destroy the local object. */
-       if (!is_only_remote_trans(th) &&
-           !oth->ot_send_updates_after_local_trans)
-               rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update, th,
-                                      false);
-
-out:
        /* For remote thandle, if there are local thandle, start it here*/
-       if (th->th_top == NULL && oth->ot_storage_th != NULL)
-               rc = dt_trans_start(env, oth->ot_storage_th->th_dev,
-                                   oth->ot_storage_th);
-
-       return rc;
+       if (is_only_remote_trans(th) && oth->ot_storage_th != NULL)
+               return dt_trans_start(env, oth->ot_storage_th->th_dev,
+                                     oth->ot_storage_th);
+       return 0;
 }
 
 /**
  * The OSP layer dt_device_operations::dt_trans_stop() interface
  * to stop the transaction.
  *
- * If the transaction is a remote transaction, or the update handler
- * is marked as 'tu_sent_after_local_trans', then related remote
+ * If the transaction is a remote transaction, related remote
  * updates will be triggered here via osp_trans_trigger().
  *
  * For synchronous mode update or any failed update, the request
@@ -705,17 +778,12 @@ out:
 int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                   struct thandle *th)
 {
-
        struct osp_thandle       *oth = thandle_to_osp_thandle(th);
        struct dt_update_request *dt_update;
        int                      rc = 0;
-       bool                     keep_dt_update = false;
+       int                      sent = 0;
        ENTRY;
 
-       dt_update = oth->ot_dur;
-       LASSERT(dt_update != NULL);
-       LASSERT(dt_update != LP_POISON);
-
        /* For remote transaction, if there is local storage thandle,
         * stop it first */
        if (oth->ot_storage_th != NULL && th->th_top == NULL) {
@@ -723,46 +791,62 @@ int osp_trans_stop(const struct lu_env *env, struct dt_device *dt,
                              oth->ot_storage_th);
                oth->ot_storage_th = NULL;
        }
+
+       dt_update = oth->ot_dur;
+       if (dt_update == NULL)
+               GOTO(out, rc);
+
+       LASSERT(dt_update != LP_POISON);
+
        /* If there are no updates, destroy dt_update and thandle */
        if (dt_update->dur_buf.ub_req == NULL ||
-           dt_update->dur_buf.ub_req->ourq_count == 0)
+           dt_update->dur_buf.ub_req->ourq_count == 0) {
+               dt_update_request_destroy(dt_update);
                GOTO(out, rc);
+       }
 
        if (is_only_remote_trans(th) && !th->th_sync) {
                struct osp_device *osp = dt2osp_dev(th->th_dev);
                struct client_obd *cli = &osp->opd_obd->u.cli;
 
-               if (th->th_result != 0) {
-                       rc = th->th_result;
+               rc = obd_get_request_slot(cli);
+               if (rc != 0)
                        GOTO(out, rc);
-               }
 
-               rc = obd_get_request_slot(cli);
                if (!osp->opd_imp_active || !osp->opd_imp_connected) {
-                       if (rc == 0)
-                               obd_put_request_slot(cli);
-                       rc = -ENOTCONN;
+                       obd_put_request_slot(cli);
+                       GOTO(out, rc = -ENOTCONN);
                }
-               if (rc != 0)
-                       GOTO(out, rc);
 
                rc = osp_trans_trigger(env, dt2osp_dev(dt),
-                                      dt_update, th, true);
+                                      dt_update, th, &sent);
                if (rc != 0)
                        obd_put_request_slot(cli);
-               else
-                       keep_dt_update = true;
        } else {
-               if (oth->ot_send_updates_after_local_trans ||
-                  (is_only_remote_trans(th) && th->th_sync))
-                       rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
-                                              th, false);
-               rc = dt_update->dur_rc;
+               rc = osp_trans_trigger(env, dt2osp_dev(dt), dt_update,
+                                      th, &sent);
        }
 
 out:
-       if (!keep_dt_update)
+       /* If RPC is triggered successfully, dt_update will be freed in
+        * osp_update_interpreter() */
+       if (rc != 0 && dt_update != NULL && sent == 0) {
+               struct osp_update_callback *ouc;
+               struct osp_update_callback *next;
+
+               list_for_each_entry_safe(ouc, next, &dt_update->dur_cb_items,
+                                ouc_list) {
+                       list_del_init(&ouc->ouc_list);
+                       if (ouc->ouc_interpreter != NULL)
+                               ouc->ouc_interpreter(env, NULL, NULL,
+                                                    ouc->ouc_obj,
+                                                    ouc->ouc_data, 0, rc);
+                       osp_update_callback_fini(env, ouc);
+               }
+
                dt_update_request_destroy(dt_update);
+       }
+
        OBD_FREE_PTR(oth);
 
        RETURN(rc);
index 192a2b8..a6d6b36 100644 (file)
 
 #define OUT_UPDATE_BUFFER_SIZE_ADD     4096
 #define OUT_UPDATE_BUFFER_SIZE_MAX     (256 * 4096)  /* 1MB update size now */
-static inline struct object_update_request *
-object_update_request_alloc(size_t size)
-{
-       struct object_update_request *ourq;
-
-       OBD_ALLOC_LARGE(ourq, size);
-       if (ourq == NULL)
-               RETURN(ERR_PTR(-ENOMEM));
-
-       ourq->ourq_magic = UPDATE_REQUEST_MAGIC;
-       ourq->ourq_count = 0;
-
-       RETURN(ourq);
-}
-
-static inline void
-object_update_request_free(struct object_update_request *ourq,
-                          size_t ourq_size)
-{
-       if (ourq != NULL)
-               OBD_FREE_LARGE(ourq, ourq_size);
-}
-
-/**
- * Allocate and initialize dt_update_request
- *
- * dt_update_request is being used to track updates being executed on
- * this dt_device(OSD or OSP). The update buffer will be 4k initially,
- * and increased if needed.
- *
- * \param [in] dt      dt device
- *
- * \retval             dt_update_request being allocated if succeed
- * \retval             ERR_PTR(errno) if failed
- */
-struct dt_update_request *dt_update_request_create(struct dt_device *dt)
-{
-       struct dt_update_request *dt_update;
-       struct object_update_request *ourq;
-
-       OBD_ALLOC_PTR(dt_update);
-       if (!dt_update)
-               return ERR_PTR(-ENOMEM);
-
-       ourq = object_update_request_alloc(OUT_UPDATE_INIT_BUFFER_SIZE);
-       if (IS_ERR(ourq)) {
-               OBD_FREE_PTR(dt_update);
-               return ERR_CAST(ourq);
-       }
-
-       dt_update->dur_buf.ub_req = ourq;
-       dt_update->dur_buf.ub_req_size = OUT_UPDATE_INIT_BUFFER_SIZE;
-
-       dt_update->dur_dt = dt;
-       dt_update->dur_batchid = 0;
-       INIT_LIST_HEAD(&dt_update->dur_cb_items);
-
-       return dt_update;
-}
-EXPORT_SYMBOL(dt_update_request_create);
-
-/**
- * Destroy dt_update_request
- *
- * \param [in] dt_update       dt_update_request being destroyed
- */
-void dt_update_request_destroy(struct dt_update_request *dt_update)
-{
-       if (dt_update == NULL)
-               return;
-
-       object_update_request_free(dt_update->dur_buf.ub_req,
-                                  dt_update->dur_buf.ub_req_size);
-       OBD_FREE_PTR(dt_update);
-
-       return;
-}
-EXPORT_SYMBOL(dt_update_request_destroy);
-
 /**
  * resize update buffer
  *
@@ -217,11 +138,11 @@ out_update_header_pack(const struct lu_env *env, struct update_buffer *ubuf,
                param = (struct object_update_param *)((char *)param +
                         object_update_param_size(param));
        }
-       ureq->ourq_count++;
 
        CDEBUG(D_INFO, "%p "DFID" idx %u: op %d params %d:%d\n",
               ureq, PFID(fid), ureq->ourq_count, op, params_count,
               (int)update_size);
+       ureq->ourq_count++;
 
        RETURN(obj_update);
 }