Whamcloud - gitweb
LU-1267 lfsck: enhance API for MDT-OST consistency 56/7156/33
authorFan Yong <fan.yong@intel.com>
Thu, 30 Jan 2014 19:05:05 +0000 (03:05 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Wed, 12 Feb 2014 06:14:54 +0000 (06:14 +0000)
Introduce new dt_object method ::do_declare_attr_get(). The caller
can use such method to notify low layer that "It will need the OST
object attribute very soon, please help to prepare in advance". For
the LFSCK layout consistency verification, the osp_declare_attr_get()
will use UPDATE_OBJ RPC with sub_opcode OBJ_ATTR_GET.

Similarly, another new new dt_object method ::do_declare_xattr_get()
is used to notify low layer that "It will need the OST object xattr
very soon, please help to prepare in advance", which uses UPDATE_OBJ
RPC with sub_opcode OBJ_XATTR_GET.

These idempotent requests can be batched together during the phase
of declaration and sent out via single OUT RPC. It can be shared by
any thread that wants to send idempotent requests to the same OST.

Introduce cache in OSP for remote object's attribute and extended
attribute. Currently, it is mainly used to hold those pre-fetched
OST-objects' kinds of attr/parent FID. But it also can be used by
DNE for other purposes in the future.

For performance, the batched idempotent OUT RPC uses asynchronous
mode. Every sub operation needs to register its own interpterer.
These interpterers will be called one by one after the batched OUT
RPC is replied by the OST.

Implement do_attr_get() against OSP-object for the MDT to get OST
object attribute.

Implement do_xattr_get() against OSP-object for the MDT to get OST
object parent FID attribute.

Implement do_xattr_set() against OSP-object for the MDT to set OST
object parent FID extended attribute.

Some code cleanup and re-organization, such as moving transaction
related code from osp/osp_md_object.c to osp/osp_trans.c, moving
common OUT code from osp/osp_md_object.c to target/out_lib.c.

Originally, only DNE operations will use OUT RPCs, so they use sync
mode transaction. But with LFSCK phase II introduced, sync mode OUT
RPC processing is bad performance. The patch improves related funcs
to allow LFSCK to use async mode transaction for OUT RPC processing,
and DNE related operations still use sync mode transaction.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: I4fe99f96ad24d43c1edea3a4a16b7ed206c38c4f
Reviewed-on: http://review.whamcloud.com/7156
Tested-by: Jenkins
Reviewed-by: Alex Zhuravlev <alexey.zhuravlev@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
19 files changed:
lustre/include/dt_object.h
lustre/include/lu_target.h
lustre/include/lustre_export.h
lustre/include/lustre_update.h
lustre/lfsck/lfsck_layout.c
lustre/mdt/mdt_handler.c
lustre/osp/Makefile.in
lustre/osp/osp_dev.c
lustre/osp/osp_internal.h
lustre/osp/osp_md_object.c
lustre/osp/osp_object.c
lustre/osp/osp_trans.c [new file with mode: 0644]
lustre/ptlrpc/Makefile.in
lustre/ptlrpc/ptlrpcd.c
lustre/target/Makefile.am
lustre/target/out_handler.c
lustre/target/out_lib.c [new file with mode: 0644]
lustre/target/tgt_internal.h
lustre/target/tgt_lastrcvd.c

index e93ae43..6a09b34 100644 (file)
@@ -300,6 +300,9 @@ struct dt_object_operations {
          * lu_object_operations, but that would break existing symmetry.
          */
 
          * lu_object_operations, but that would break existing symmetry.
          */
 
+       int   (*do_declare_attr_get)(const struct lu_env *env,
+                                    struct dt_object *dt,
+                                    struct lustre_capa *capa);
         /**
          * Return standard attributes.
          *
         /**
          * Return standard attributes.
          *
@@ -322,6 +325,13 @@ struct dt_object_operations {
                              const struct lu_attr *attr,
                              struct thandle *handle,
                              struct lustre_capa *capa);
                              const struct lu_attr *attr,
                              struct thandle *handle,
                              struct lustre_capa *capa);
+
+       int   (*do_declare_xattr_get)(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lu_buf *buf,
+                                     const char *name,
+                                     struct lustre_capa *capa);
+
         /**
          * Return a value of an extended attribute.
          *
         /**
          * Return a value of an extended attribute.
          *
@@ -1090,6 +1100,16 @@ static inline int dt_write_locked(const struct lu_env *env,
         return dt->do_ops->do_write_locked(env, dt);
 }
 
         return dt->do_ops->do_write_locked(env, dt);
 }
 
+static inline int dt_declare_attr_get(const struct lu_env *env,
+                                     struct dt_object *dt,
+                                     struct lustre_capa *capa)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_ops);
+       LASSERT(dt->do_ops->do_declare_attr_get);
+       return dt->do_ops->do_declare_attr_get(env, dt, capa);
+}
+
 static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
                               struct lu_attr *la, void *arg)
 {
 static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
                               struct lu_attr *la, void *arg)
 {
@@ -1373,6 +1393,18 @@ static inline int dt_xattr_set(const struct lu_env *env,
         return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
 }
 
         return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
 }
 
+static inline int dt_declare_xattr_get(const struct lu_env *env,
+                                      struct dt_object *dt,
+                                      struct lu_buf *buf,
+                                      const char *name,
+                                      struct lustre_capa *capa)
+{
+       LASSERT(dt);
+       LASSERT(dt->do_ops);
+       LASSERT(dt->do_ops->do_declare_xattr_get);
+       return dt->do_ops->do_declare_xattr_get(env, dt, buf, name, capa);
+}
+
 static inline int dt_xattr_get(const struct lu_env *env,
                               struct dt_object *dt, struct lu_buf *buf,
                               const char *name, struct lustre_capa *capa)
 static inline int dt_xattr_get(const struct lu_env *env,
                               struct dt_object *dt, struct lu_buf *buf,
                               const char *name, struct lustre_capa *capa)
index 02f7e0b..a8d6984 100644 (file)
@@ -337,6 +337,23 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
 
 int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
                           loff_t off);
 
+/* target/out_lib.c */
+struct update_request *
+out_find_update(struct thandle *th, struct dt_device *dt_dev);
+void out_destroy_update_req(struct update_request *update);
+struct update_request *out_create_update_req(struct dt_device *dt);
+struct update_request *out_find_create_update_loc(struct thandle *th,
+                                                 struct dt_object *dt);
+int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct update_buf *ubuf, int ubuf_len,
+                       struct ptlrpc_request **reqp);
+int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
+                   struct update_request *update,
+                   struct ptlrpc_request **reqp);
+int out_insert_update(const struct lu_env *env, struct update_request *update,
+                     int op, const struct lu_fid *fid, int count,
+                     int *lens, const char **bufs);
+
 enum {
        ESERIOUS = 0x0001000
 };
 enum {
        ESERIOUS = 0x0001000
 };
index dc2f64d..24e5c47 100644 (file)
@@ -233,6 +233,7 @@ struct obd_export {
                                   exp_req_replay_needed:1,
                                   exp_lock_replay_needed:1,
                                   exp_need_sync:1,
                                   exp_req_replay_needed:1,
                                   exp_lock_replay_needed:1,
                                   exp_need_sync:1,
+                                 exp_keep_sync:1,
                                   exp_flvr_changed:1,
                                   exp_flvr_adapt:1,
                                   exp_libclient:1, /* liblustre client? */
                                   exp_flvr_changed:1,
                                   exp_flvr_adapt:1,
                                   exp_libclient:1, /* liblustre client? */
index 1829cf6..e23c66e 100644 (file)
 #define UPDATE_BUFFER_SIZE     8192
 struct update_request {
        struct dt_device        *ur_dt;
 #define UPDATE_BUFFER_SIZE     8192
 struct update_request {
        struct dt_device        *ur_dt;
-       cfs_list_t              ur_list;    /* attached itself to thandle */
-       int                     ur_flags;
-       int                     ur_rc;      /* request result */
-       int                     ur_batchid; /* Current batch(trans) id */
-       struct update_buf       *ur_buf;   /* Holding the update req */
+       struct list_head         ur_list; /* attached itself to thandle */
+       int                      ur_flags;
+       int                      ur_rc; /* request result */
+       int                      ur_batchid; /* Current batch(trans) id */
+       struct update_buf       *ur_buf; /* Holding the update req */
+       struct list_head         ur_cb_items;
 };
 
 static inline unsigned long update_size(struct update *update)
 };
 
 static inline unsigned long update_size(struct update *update)
@@ -157,13 +158,15 @@ static inline void update_insert_reply(struct update_reply *reply, void *data,
        reply->ur_lens[index] = data_len + sizeof(int);
 }
 
        reply->ur_lens[index] = data_len + sizeof(int);
 }
 
-static inline int update_get_reply_buf(struct update_reply *reply, void **buf,
-                                      int index)
+static inline int update_get_reply_buf(struct update_reply *reply,
+                                      struct lu_buf *lbuf, int index)
 {
        char *ptr;
        int  size = 0;
        int  result;
 
 {
        char *ptr;
        int  size = 0;
        int  result;
 
+       LASSERT(lbuf != NULL);
+
        ptr = update_get_buf_internal(reply, index, &size);
        LASSERT(ptr != NULL);
        result = *(int *)ptr;
        ptr = update_get_buf_internal(reply, index, &size);
        LASSERT(ptr != NULL);
        result = *(int *)ptr;
@@ -172,8 +175,11 @@ static inline int update_get_reply_buf(struct update_reply *reply, void **buf,
                return result;
 
        LASSERT(size >= sizeof(int));
                return result;
 
        LASSERT(size >= sizeof(int));
-       *buf = ptr + sizeof(int);
-       return size - sizeof(int);
+
+       lbuf->lb_buf = ptr + sizeof(int);
+       lbuf->lb_len = size - sizeof(int);
+
+       return 0;
 }
 
 static inline int update_get_reply_result(struct update_reply *reply,
 }
 
 static inline int update_get_reply_result(struct update_reply *reply,
@@ -187,6 +193,9 @@ static inline int update_get_reply_result(struct update_reply *reply,
        return *(int *)ptr;
 }
 
        return *(int *)ptr;
 }
 
-#endif
-
+static inline void update_inc_batchid(struct update_request *update)
+{
+       update->ur_batchid++;
+}
 
 
+#endif
index bdcd4c4..efe305a 100644 (file)
@@ -117,7 +117,6 @@ struct lfsck_layout_master_data {
        struct list_head        llmd_mdt_phase2_list;
 
        struct ptlrpc_thread    llmd_thread;
        struct list_head        llmd_mdt_phase2_list;
 
        struct ptlrpc_thread    llmd_thread;
-       atomic_t                llmd_rpcs_in_flight;
        __u32                   llmd_touch_gen;
        int                     llmd_prefetched;
        int                     llmd_assistant_status;
        __u32                   llmd_touch_gen;
        int                     llmd_prefetched;
        int                     llmd_assistant_status;
@@ -1285,13 +1284,6 @@ static int lfsck_layout_assistant(void *args)
                while (!list_empty(&llmd->llmd_req_list)) {
                        bool wakeup = false;
 
                while (!list_empty(&llmd->llmd_req_list)) {
                        bool wakeup = false;
 
-                       l_wait_event(athread->t_ctl_waitq,
-                                    bk->lb_async_windows == 0 ||
-                                    atomic_read(&llmd->llmd_rpcs_in_flight) <
-                                               bk->lb_async_windows ||
-                                    llmd->llmd_exit,
-                                    &lwi);
-
                        if (unlikely(llmd->llmd_exit))
                                GOTO(cleanup1, rc = llmd->llmd_post_result);
 
                        if (unlikely(llmd->llmd_exit))
                                GOTO(cleanup1, rc = llmd->llmd_post_result);
 
@@ -1318,8 +1310,7 @@ static int lfsck_layout_assistant(void *args)
                }
 
                /* Wakeup the master engine if it is waiting in checkpoint. */
                }
 
                /* Wakeup the master engine if it is waiting in checkpoint. */
-               if (atomic_read(&llmd->llmd_rpcs_in_flight) == 0)
-                       wake_up_all(&mthread->t_ctl_waitq);
+               wake_up_all(&mthread->t_ctl_waitq);
 
                l_wait_event(athread->t_ctl_waitq,
                             !lfsck_layout_req_empty(llmd) ||
 
                l_wait_event(athread->t_ctl_waitq,
                             !lfsck_layout_req_empty(llmd) ||
@@ -1443,10 +1434,6 @@ cleanup1:
        LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
                 llmd->llmd_prefetched);
 
        LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
                 llmd->llmd_prefetched);
 
-       l_wait_event(athread->t_ctl_waitq,
-                    atomic_read(&llmd->llmd_rpcs_in_flight) == 0,
-                    &lwi);
-
 cleanup2:
        memset(lr, 0, sizeof(*lr));
        lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
 cleanup2:
        memset(lr, 0, sizeof(*lr));
        lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
@@ -1807,8 +1794,7 @@ static int lfsck_layout_master_checkpoint(const struct lu_env *env,
                return 0;
 
        l_wait_event(mthread->t_ctl_waitq,
                return 0;
 
        l_wait_event(mthread->t_ctl_waitq,
-                    (list_empty(&llmd->llmd_req_list) &&
-                     atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
+                    list_empty(&llmd->llmd_req_list) ||
                     !thread_is_running(mthread) ||
                     thread_is_stopped(athread),
                     &lwi);
                     !thread_is_running(mthread) ||
                     thread_is_stopped(athread),
                     &lwi);
@@ -2129,8 +2115,7 @@ static int lfsck_layout_master_post(const struct lu_env *env,
 
        wake_up_all(&athread->t_ctl_waitq);
        l_wait_event(mthread->t_ctl_waitq,
 
        wake_up_all(&athread->t_ctl_waitq);
        l_wait_event(mthread->t_ctl_waitq,
-                    (result > 0 && list_empty(&llmd->llmd_req_list) &&
-                     atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
+                    (result > 0 && list_empty(&llmd->llmd_req_list)) ||
                     thread_is_stopped(athread),
                     &lwi);
 
                     thread_is_stopped(athread),
                     &lwi);
 
@@ -2526,7 +2511,6 @@ static void lfsck_layout_master_data_release(const struct lu_env *env,
        LASSERT(thread_is_init(&llmd->llmd_thread) ||
                thread_is_stopped(&llmd->llmd_thread));
        LASSERT(list_empty(&llmd->llmd_req_list));
        LASSERT(thread_is_init(&llmd->llmd_thread) ||
                thread_is_stopped(&llmd->llmd_thread));
        LASSERT(list_empty(&llmd->llmd_req_list));
-       LASSERT(atomic_read(&llmd->llmd_rpcs_in_flight) == 0);
 
        com->lc_data = NULL;
 
 
        com->lc_data = NULL;
 
@@ -2881,7 +2865,6 @@ int lfsck_layout_setup(const struct lu_env *env, struct lfsck_instance *lfsck)
                INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
                INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
                init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
                INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
                INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
                init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
-               atomic_set(&llmd->llmd_rpcs_in_flight, 0);
                com->lc_data = llmd;
        } else {
                struct lfsck_layout_slave_data *llsd;
                com->lc_data = llmd;
        } else {
                struct lfsck_layout_slave_data *llsd;
index 064fef1..b46fab4 100644 (file)
@@ -4926,6 +4926,9 @@ static int mdt_obd_connect(const struct lu_env *env,
                rc = tgt_client_new(env, lexp);
                 if (rc == 0)
                         mdt_export_stats_init(obd, lexp, localdata);
                rc = tgt_client_new(env, lexp);
                 if (rc == 0)
                         mdt_export_stats_init(obd, lexp, localdata);
+
+               /* For phase I, sync for cross-ref operation. */
+               lexp->exp_keep_sync = 1;
         }
 
         if (rc != 0) {
         }
 
         if (rc != 0) {
index 4c27a13..d744ec9 100644 (file)
@@ -1,6 +1,6 @@
 MODULES = osp
 osp-objs = osp_dev.o osp_object.o osp_precreate.o osp_sync.o lproc_osp.o
 MODULES = osp
 osp-objs = osp_dev.o osp_object.o osp_precreate.o osp_sync.o lproc_osp.o
-osp-objs += lwp_dev.o  osp_md_object.o
+osp-objs += lwp_dev.o osp_md_object.o osp_trans.o
 
 EXTRA_DIST = $(osp-objs:.o=.c) osp_internal.h
 
 
 EXTRA_DIST = $(osp-objs:.o=.c) osp_internal.h
 
index 5828d88..23a81b2 100644 (file)
@@ -489,10 +489,11 @@ static int osp_sync(const struct lu_env *env, struct dt_device *dev)
 }
 
 const struct dt_device_operations osp_dt_ops = {
 }
 
 const struct dt_device_operations osp_dt_ops = {
-       .dt_statfs      = osp_statfs,
-       .dt_sync        = osp_sync,
-       .dt_trans_start = osp_trans_start,
-       .dt_trans_stop  = osp_trans_stop,
+       .dt_statfs       = osp_statfs,
+       .dt_sync         = osp_sync,
+       .dt_trans_create = osp_trans_create,
+       .dt_trans_start  = osp_trans_start,
+       .dt_trans_stop   = osp_trans_stop,
 };
 
 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
 };
 
 static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
@@ -547,6 +548,7 @@ static int osp_init0(const struct lu_env *env, struct osp_device *m,
 
        ENTRY;
 
 
        ENTRY;
 
+       mutex_init(&m->opd_async_requests_mutex);
        obd = class_name2obd(lustre_cfg_string(cfg, 0));
        if (obd == NULL) {
                CERROR("Cannot find obd with name %s\n",
        obd = class_name2obd(lustre_cfg_string(cfg, 0));
        if (obd == NULL) {
                CERROR("Cannot find obd with name %s\n",
@@ -819,6 +821,11 @@ static struct lu_device *osp_device_fini(const struct lu_env *env,
 
        ENTRY;
 
 
        ENTRY;
 
+       if (m->opd_async_requests != NULL) {
+               out_destroy_update_req(m->opd_async_requests);
+               m->opd_async_requests = NULL;
+       }
+
        if (m->opd_storage_exp)
                obd_disconnect(m->opd_storage_exp);
 
        if (m->opd_storage_exp)
                obd_disconnect(m->opd_storage_exp);
 
@@ -1198,7 +1205,7 @@ static struct lu_device_type osp_device_type = {
        .ldt_tags     = LU_DEVICE_DT,
        .ldt_name     = LUSTRE_OSP_NAME,
        .ldt_ops      = &osp_device_type_ops,
        .ldt_tags     = LU_DEVICE_DT,
        .ldt_name     = LUSTRE_OSP_NAME,
        .ldt_ops      = &osp_device_type_ops,
-       .ldt_ctx_tags = LCT_MD_THREAD
+       .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
 };
 
 static struct obd_ops osp_obd_device_ops = {
 };
 
 static struct obd_ops osp_obd_device_ops = {
index 0871d8d..1ba68d2 100644 (file)
@@ -46,6 +46,8 @@
 #include <dt_object.h>
 #include <md_object.h>
 #include <lustre_fid.h>
 #include <dt_object.h>
 #include <md_object.h>
 #include <lustre_fid.h>
+#include <lustre_update.h>
+#include <lu_target.h>
 
 /*
  * Infrastructure to support tracking of last committed llog record
 
 /*
  * Infrastructure to support tracking of last committed llog record
@@ -184,6 +186,15 @@ struct osp_device {
        int                              opd_statfs_maxage;
 
        cfs_proc_dir_entry_t            *opd_symlink;
        int                              opd_statfs_maxage;
 
        cfs_proc_dir_entry_t            *opd_symlink;
+
+       /* If the caller wants to do some idempotent async operations on
+        * remote server, it can append the async remote requests on the
+        * osp_device::opd_async_requests via declare() functions, these
+        * requests can be packed together and sent to the remote server
+        * via single OUT RPC later. */
+       struct update_request           *opd_async_requests;
+       /* Protect current operations on opd_async_requests. */
+       struct mutex                     opd_async_requests_mutex;
 };
 
 #define opd_pre_lock                   opd_pre->osp_pre_lock
 };
 
 #define opd_pre_lock                   opd_pre->osp_pre_lock
@@ -200,17 +211,40 @@ struct osp_device {
 
 extern struct kmem_cache *osp_object_kmem;
 
 
 extern struct kmem_cache *osp_object_kmem;
 
+/* The first part of oxe_buf is xattr name, and is '\0' terminated.
+ * The left part is for value, binary mode. */
+struct osp_xattr_entry {
+       struct list_head         oxe_list;
+       atomic_t                 oxe_ref;
+       void                    *oxe_value;
+       int                      oxe_buflen;
+       int                      oxe_namelen;
+       int                      oxe_vallen;
+       unsigned int             oxe_exist:1,
+                                oxe_ready:1;
+       char                     oxe_buf[0];
+};
+
+struct osp_object_attr {
+       struct lu_attr          ooa_attr;
+       struct list_head        ooa_xattr_list;
+};
+
 /* this is a top object */
 struct osp_object {
        struct lu_object_header opo_header;
        struct dt_object        opo_obj;
        unsigned int            opo_reserved:1,
                                opo_new:1,
 /* this is a top object */
 struct osp_object {
        struct lu_object_header opo_header;
        struct dt_object        opo_obj;
        unsigned int            opo_reserved:1,
                                opo_new:1,
-                               opo_empty:1;
+                               opo_empty:1,
+                               opo_non_exist:1;
 
        /* read/write lock for md osp object */
        struct rw_semaphore     opo_sem;
        const struct lu_env     *opo_owner;
 
        /* read/write lock for md osp object */
        struct rw_semaphore     opo_sem;
        const struct lu_env     *opo_owner;
+       struct osp_object_attr *opo_ooa;
+       /* Protect opo_ooa. */
+       spinlock_t              opo_lock;
 };
 
 extern struct lu_object_operations osp_lu_obj_ops;
 };
 
 extern struct lu_object_operations osp_lu_obj_ops;
@@ -239,6 +273,11 @@ struct osp_thread_info {
        struct obdo              osi_obdo;
 };
 
        struct obdo              osi_obdo;
 };
 
+static inline bool is_remote_trans(struct thandle *th)
+{
+       return th->th_dev->dd_ops == &osp_dt_ops;
+}
+
 static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
                                      __u32 *id, int index)
 {
 static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
                                      __u32 *id, int index)
 {
@@ -409,14 +448,49 @@ static inline int osp_is_fid_client(struct osp_device *osp)
        return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
 }
 
        return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
 }
 
+typedef int (*osp_async_update_interpterer_t)(const struct lu_env *env,
+                                             struct update_reply *reply,
+                                             struct osp_object *obj,
+                                             void *data, int index, int rc);
+
 /* osp_dev.c */
 void osp_update_last_id(struct osp_device *d, obd_id objid);
 extern struct llog_operations osp_mds_ost_orig_logops;
 
 /* osp_dev.c */
 void osp_update_last_id(struct osp_device *d, obd_id objid);
 extern struct llog_operations osp_mds_ost_orig_logops;
 
-/* osp_md_object.c */
+/* osp_trans.c */
+struct update_request *
+osp_find_or_create_async_update_request(struct osp_device *osp);
+int osp_insert_async_update(const struct lu_env *env,
+                           struct update_request *update, int op,
+                           struct osp_object *obj, int count,
+                           int *lens, const char **bufs, void *data,
+                           osp_async_update_interpterer_t interpterer);
+int osp_unplug_async_update(const struct lu_env *env,
+                           struct osp_device *osp,
+                           struct update_request *update);
+struct thandle *osp_trans_create(const struct lu_env *env,
+                                struct dt_device *d);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
 int osp_trans_stop(const struct lu_env *env, struct thandle *th);
 int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
                    struct thandle *th);
 int osp_trans_stop(const struct lu_env *env, struct thandle *th);
+
+/* osp_object.c */
+int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
+                struct lu_attr *attr, struct lustre_capa *capa);
+int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
+                 struct lu_buf *buf, const char *name,
+                 struct lustre_capa *capa);
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, const char *name,
+                         int flag, struct thandle *th);
+int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                 const struct lu_buf *buf, const char *name, int fl,
+                 struct thandle *th, struct lustre_capa *capa);
+int osp_declare_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt, struct thandle *th);
+int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                      struct thandle *th);
+
 /* osp_precreate.c */
 int osp_init_precreate(struct osp_device *d);
 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
 /* osp_precreate.c */
 int osp_init_precreate(struct osp_device *d);
 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
index c790581..fd262ac 100644 (file)
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <lustre_log.h>
 #define DEBUG_SUBSYSTEM S_MDS
 
 #include <lustre_log.h>
-#include <lustre_update.h>
 #include "osp_internal.h"
 #include "osp_internal.h"
+
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
 static const char dot[] = ".";
 static const char dotdot[] = "..";
 
-static int osp_prep_update_req(const struct lu_env *env,
-                              struct osp_device *osp,
-                              struct update_buf *ubuf, int ubuf_len,
-                              struct ptlrpc_request **reqp)
-{
-       struct obd_import      *imp;
-       struct ptlrpc_request  *req;
-       struct update_buf      *tmp;
-       int                     rc;
-       ENTRY;
-
-       imp = osp->opd_obd->u.cli.cl_import;
-       LASSERT(imp);
-
-       req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
-       if (req == NULL)
-               RETURN(-ENOMEM);
-
-       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
-                            UPDATE_BUFFER_SIZE);
-
-       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
-       if (rc != 0) {
-               ptlrpc_req_finished(req);
-               RETURN(rc);
-       }
-
-       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
-                            UPDATE_BUFFER_SIZE);
-
-       tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
-       memcpy(tmp, ubuf, ubuf_len);
-
-       ptlrpc_request_set_replen(req);
-
-       *reqp = req;
-
-       RETURN(rc);
-}
-
-static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
-                          struct update_request *update,
-                          struct ptlrpc_request **reqp)
-{
-       struct osp_device       *osp = dt2osp_dev(dt);
-       struct ptlrpc_request   *req = NULL;
-       int                     rc;
-       ENTRY;
-
-       rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
-                                &req);
-       if (rc)
-               RETURN(rc);
-
-       /* Note: some dt index api might return non-zero result here, like
-        * osd_index_ea_lookup, so we should only check rc < 0 here */
-       rc = ptlrpc_queue_wait(req);
-       if (rc < 0) {
-               ptlrpc_req_finished(req);
-               update->ur_rc = rc;
-               RETURN(rc);
-       }
-
-       if (reqp != NULL) {
-               *reqp = req;
-               RETURN(rc);
-       }
-
-       update->ur_rc = rc;
-
-       ptlrpc_req_finished(req);
-
-       RETURN(rc);
-}
-
-/**
- * Create a new update request for the device.
- */
-static struct update_request
-*osp_create_update_req(struct dt_device *dt)
-{
-       struct update_request *update;
-
-       OBD_ALLOC_PTR(update);
-       if (!update)
-               return ERR_PTR(-ENOMEM);
-
-       OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
-       if (update->ur_buf == NULL) {
-               OBD_FREE_PTR(update);
-               return ERR_PTR(-ENOMEM);
-       }
-
-       CFS_INIT_LIST_HEAD(&update->ur_list);
-       update->ur_dt = dt;
-       update->ur_batchid = 0;
-       update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
-       update->ur_buf->ub_count = 0;
-
-       return update;
-}
-
-static void osp_destroy_update_req(struct update_request *update)
-{
-       if (update == NULL)
-               return;
-
-       cfs_list_del(&update->ur_list);
-       if (update->ur_buf != NULL)
-               OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
-
-       OBD_FREE_PTR(update);
-       return;
-}
-
-int osp_trans_stop(const struct lu_env *env, struct thandle *th)
-{
-       int rc = 0;
-
-       rc = th->th_current_request->ur_rc;
-       osp_destroy_update_req(th->th_current_request);
-       th->th_current_request = NULL;
-
-       return rc;
-}
-
-/**
- * In DNE phase I, all remote updates will be packed into RPC (the format
- * description is in lustre_idl.h) during declare phase, all of updates
- * are attached to the transaction, one entry per OSP. Then in trans start,
- * LOD will walk through these entries and send these UPDATEs to the remote
- * MDT to be executed synchronously.
- */
-int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
-                   struct thandle *th)
-{
-       struct update_request *update;
-       int rc = 0;
-
-       /* In phase I, if the transaction includes remote updates, the local
-        * update should be synchronized, so it will set th_sync = 1 */
-       update = th->th_current_request;
-       LASSERT(update != NULL && update->ur_dt == dt);
-       if (update->ur_buf->ub_count > 0) {
-               rc = osp_remote_sync(env, dt, update, NULL);
-               th->th_sync = 1;
-       }
-
-       RETURN(rc);
-}
-
-/**
- * Insert the update into the th_bufs for the device.
- */
-static int osp_insert_update(const struct lu_env *env,
-                            struct update_request *update, int op,
-                            struct lu_fid *fid, int count,
-                            int *lens, char **bufs)
-{
-       struct update_buf    *ubuf = update->ur_buf;
-       struct update        *obj_update;
-       char                 *ptr;
-       int                   i;
-       int                   update_length;
-       int                   rc = 0;
-       ENTRY;
-
-       obj_update = (struct update *)((char *)ubuf +
-                     cfs_size_round(update_buf_size(ubuf)));
-
-       /* Check update size to make sure it can fit into the buffer */
-       update_length = cfs_size_round(offsetof(struct update,
-                                      u_bufs[0]));
-       for (i = 0; i < count; i++)
-               update_length += cfs_size_round(lens[i]);
-
-       if (cfs_size_round(update_buf_size(ubuf)) + update_length >
-           UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
-               CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
-                       update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
-                       update_length, ubuf->ub_count, update_buf_size(ubuf),
-                       -E2BIG);
-               RETURN(-E2BIG);
-       }
-
-       if (count > UPDATE_BUF_COUNT) {
-               CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
-                       update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
-                       PFID(fid), op, -E2BIG);
-               RETURN(-E2BIG);
-       }
-
-       /* fill the update into the update buffer */
-       fid_cpu_to_le(&obj_update->u_fid, fid);
-       obj_update->u_type = cpu_to_le32(op);
-       obj_update->u_batchid = update->ur_batchid;
-       for (i = 0; i < count; i++)
-               obj_update->u_lens[i] = cpu_to_le32(lens[i]);
-
-       ptr = (char *)obj_update +
-                       cfs_size_round(offsetof(struct update, u_bufs[0]));
-       for (i = 0; i < count; i++)
-               LOGL(bufs[i], lens[i], ptr);
-
-       ubuf->ub_count++;
-
-       CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
-              update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
-              ubuf->ub_count, op, count, update_buf_size(ubuf));
-
-       RETURN(rc);
-}
-
-static struct update_request
-*osp_find_update(struct thandle *th, struct dt_device *dt_dev)
-{
-       struct update_request   *update;
-
-       /* Because transaction api does not proivde the interface
-        * to transfer the update from LOD to OSP,  we need walk
-        * remote update list to find the update, this probably
-        * should move to LOD layer, when update can be part of
-        * the trancation api parameter. XXX */
-       cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
-               if (update->ur_dt == dt_dev)
-                       return update;
-       }
-       return NULL;
-}
-
-static inline void osp_md_add_update_batchid(struct update_request *update)
-{
-       update->ur_batchid++;
-}
-
-/**
- * Find one loc in th_dev/dev_obj_update for the update,
- * Because only one thread can access this thandle, no need
- * lock now.
- */
-static struct update_request
-*osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
-{
-       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-       struct update_request   *update;
-       ENTRY;
-
-       update = osp_find_update(th, dt_dev);
-       if (update != NULL)
-               RETURN(update);
-
-       update = osp_create_update_req(dt_dev);
-       if (IS_ERR(update))
-               RETURN(update);
-
-       cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
-
-       RETURN(update);
-}
-
-static int osp_get_attr_from_req(const struct lu_env *env,
-                                struct ptlrpc_request *req,
-                                struct lu_attr *attr, int index)
-{
-       struct update_reply     *reply;
-       struct obdo             *lobdo = &osp_env_info(env)->osi_obdo;
-       struct obdo             *wobdo;
-       int                     size;
-
-       LASSERT(attr != NULL);
-
-       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
-                                            UPDATE_BUFFER_SIZE);
-       if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
-               return -EPROTO;
-
-       size = update_get_reply_buf(reply, (void **)&wobdo, index);
-       if (size != sizeof(struct obdo))
-               return -EPROTO;
-
-       obdo_le_to_cpu(wobdo, wobdo);
-       lustre_get_wire_obdo(NULL, lobdo, wobdo);
-       la_from_obdo(attr, lobdo, lobdo->o_valid);
-
-       return 0;
-}
-
 static int osp_md_declare_object_create(const struct lu_env *env,
                                        struct dt_object *dt,
                                        struct lu_attr *attr,
 static int osp_md_declare_object_create(const struct lu_env *env,
                                        struct dt_object *dt,
                                        struct lu_attr *attr,
@@ -340,7 +53,7 @@ static int osp_md_declare_object_create(const struct lu_env *env,
        int                     buf_count;
        int                     rc;
 
        int                     buf_count;
        int                     rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -385,20 +98,20 @@ static int osp_md_declare_object_create(const struct lu_env *env,
                CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
 
                CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
 
-               rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+               rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
                                       NULL, NULL);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
                        /* decrease for ".." */
                                       NULL, NULL);
                if (rc != 0)
                        GOTO(out, rc);
 
                if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
                        /* decrease for ".." */
-                       rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+                       rc = out_insert_update(env, update, OBJ_REF_DEL, fid1,
                                               0, NULL, NULL);
                        if (rc != 0)
                                GOTO(out, rc);
                }
 
                                               0, NULL, NULL);
                        if (rc != 0)
                                GOTO(out, rc);
                }
 
-               rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+               rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
                                       NULL);
                if (rc != 0)
                        GOTO(out, rc);
                                       NULL);
                if (rc != 0)
                        GOTO(out, rc);
@@ -406,11 +119,11 @@ static int osp_md_declare_object_create(const struct lu_env *env,
                dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
                /* Increase batchid to add this orphan object deletion
                 * to separate transaction */
                dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
                /* Increase batchid to add this orphan object deletion
                 * to separate transaction */
-               osp_md_add_update_batchid(update);
+               update_inc_batchid(update);
        }
 
        }
 
-       rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
-                              bufs);
+       rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
+                              (const char **)bufs);
 out:
        if (rc)
                CERROR("%s: Insert update error: rc = %d\n",
 out:
        if (rc)
                CERROR("%s: Insert update error: rc = %d\n",
@@ -447,7 +160,7 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env,
        struct lu_fid           *fid;
        int                     rc;
 
        struct lu_fid           *fid;
        int                     rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -457,7 +170,7 @@ static int osp_md_declare_object_ref_del(const struct lu_env *env,
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
-       rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
+       rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
 
        return rc;
 }
 
        return rc;
 }
@@ -479,7 +192,7 @@ static int osp_md_declare_ref_add(const struct lu_env *env,
        struct lu_fid           *fid;
        int                     rc;
 
        struct lu_fid           *fid;
        int                     rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -489,7 +202,7 @@ static int osp_md_declare_ref_add(const struct lu_env *env,
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
-       rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
+       rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
 
        return rc;
 }
 
        return rc;
 }
@@ -529,7 +242,7 @@ static int osp_md_declare_attr_set(const struct lu_env *env,
        char                   *buf;
        int                     rc;
 
        char                   *buf;
        int                     rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -547,7 +260,8 @@ static int osp_md_declare_attr_set(const struct lu_env *env,
        buf = (char *)&osi->osi_obdo;
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
        buf = (char *)&osi->osi_obdo;
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
-       rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
+       rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size,
+                              (const char **)&buf);
 
        return rc;
 }
 
        return rc;
 }
@@ -562,114 +276,6 @@ static int osp_md_attr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(0);
 }
 
        RETURN(0);
 }
 
-static int osp_md_declare_xattr_set(const struct lu_env *env,
-                                   struct dt_object *dt,
-                                   const struct lu_buf *buf,
-                                   const char *name, int flag,
-                                   struct thandle *th)
-{
-       struct update_request   *update;
-       struct lu_fid           *fid;
-       int                     sizes[3] = {strlen(name), buf->lb_len,
-                                           sizeof(int)};
-       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
-       int                     rc;
-
-       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
-       update = osp_find_create_update_loc(th, dt);
-       if (IS_ERR(update)) {
-               CERROR("%s: Get OSP update buf failed: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name,
-                      (int)PTR_ERR(update));
-               return PTR_ERR(update);
-       }
-
-       flag = cpu_to_le32(flag);
-       bufs[2] = (char *)&flag;
-
-       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
-       rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
-                              ARRAY_SIZE(sizes), sizes, bufs);
-
-       return rc;
-}
-
-static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
-                           const struct lu_buf *buf, const char *name, int fl,
-                           struct thandle *th, struct lustre_capa *capa)
-{
-       CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
-              PFID(&dt->do_lu.lo_header->loh_fid));
-
-       return 0;
-}
-
-static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
-                           struct lu_buf *buf, const char *name,
-                           struct lustre_capa *capa)
-{
-       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-       struct update_request   *update = NULL;
-       struct ptlrpc_request   *req = NULL;
-       int                     rc;
-       int                     buf_len;
-       int                     size;
-       struct update_reply     *reply;
-       void                    *ea_buf;
-       ENTRY;
-
-       /* Because it needs send the update buffer right away,
-        * just create an update buffer, instead of attaching the
-        * update_remote list of the thandle.
-        */
-       update = osp_create_update_req(dt_dev);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
-
-       LASSERT(name != NULL);
-       buf_len = strlen(name);
-       rc = osp_insert_update(env, update, OBJ_XATTR_GET,
-                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
-                              1, &buf_len, (char **)&name);
-       if (rc != 0) {
-               CERROR("%s: Insert update error: rc = %d\n",
-                      dt->do_lu.lo_dev->ld_obd->obd_name, rc);
-               GOTO(out, rc);
-       }
-       dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-
-       rc = osp_remote_sync(env, dt_dev, update, &req);
-       if (rc != 0)
-               GOTO(out, rc);
-
-       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
-                                           UPDATE_BUFFER_SIZE);
-       if (reply->ur_version != UPDATE_REPLY_V1) {
-               CERROR("%s: Wrong version %x expected %x: rc = %d\n",
-                      dt_dev->dd_lu_dev.ld_obd->obd_name,
-                      reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
-               GOTO(out, rc = -EPROTO);
-       }
-
-       size = update_get_reply_buf(reply, &ea_buf, 0);
-       if (size < 0)
-               GOTO(out, rc = size);
-
-       LASSERT(size > 0 && size < PAGE_CACHE_SIZE);
-       LASSERT(ea_buf != NULL);
-
-       rc = size;
-       if (buf->lb_buf != NULL)
-               memcpy(buf->lb_buf, ea_buf, size);
-out:
-       if (req != NULL)
-               ptlrpc_req_finished(req);
-
-       osp_destroy_update_req(update);
-
-       RETURN(rc);
-}
-
 static void osp_md_object_read_lock(const struct lu_env *env,
                                    struct dt_object *dt, unsigned role)
 {
 static void osp_md_object_read_lock(const struct lu_env *env,
                                    struct dt_object *dt, unsigned role)
 {
@@ -722,11 +328,12 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
                               struct dt_rec *rec, const struct dt_key *key,
                               struct lustre_capa *capa)
 {
                               struct dt_rec *rec, const struct dt_key *key,
                               struct lustre_capa *capa)
 {
-       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
+       struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
+       struct dt_device        *dt_dev = &osp->opd_dt_dev;
        struct update_request   *update;
        struct ptlrpc_request   *req = NULL;
        int                     size = strlen((char *)key) + 1;
        struct update_request   *update;
        struct ptlrpc_request   *req = NULL;
        int                     size = strlen((char *)key) + 1;
-       char                    *name = (char *)key;
        int                     rc;
        struct update_reply     *reply;
        struct lu_fid           *fid;
        int                     rc;
        struct update_reply     *reply;
        struct lu_fid           *fid;
@@ -737,20 +344,20 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
         * just create an update buffer, instead of attaching the
         * update_remote list of the thandle.
         */
         * just create an update buffer, instead of attaching the
         * update_remote list of the thandle.
         */
-       update = osp_create_update_req(dt_dev);
+       update = out_create_update_req(dt_dev);
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
        if (IS_ERR(update))
                RETURN(PTR_ERR(update));
 
-       rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
-                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
-                              1, &size, (char **)&name);
+       rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP,
+                              lu_object_fid(&dt->do_lu), 1, &size,
+                              (const char **)&key);
        if (rc) {
                CERROR("%s: Insert update error: rc = %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
                GOTO(out, rc);
        }
 
        if (rc) {
                CERROR("%s: Insert update error: rc = %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
                GOTO(out, rc);
        }
 
-       rc = osp_remote_sync(env, dt_dev, update, &req);
+       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
        if (rc < 0)
                GOTO(out, rc);
 
        if (rc < 0)
                GOTO(out, rc);
 
@@ -771,33 +378,38 @@ static int osp_md_index_lookup(const struct lu_env *env, struct dt_object *dt,
                GOTO(out, rc);
        }
 
                GOTO(out, rc);
        }
 
-       size = update_get_reply_buf(reply, (void **)&fid, 0);
-       if (size < 0)
-               GOTO(out, rc = size);
+       rc = update_get_reply_buf(reply, lbuf, 0);
+       if (rc < 0)
+               GOTO(out, rc);
 
 
-       if (size != sizeof(struct lu_fid)) {
-               CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
+       if (lbuf->lb_len != sizeof(*fid)) {
+               CERROR("%s: lookup "DFID" %s wrong size %d\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name,
                       dt_dev->dd_lu_dev.ld_obd->obd_name,
-                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key,
+                      (int)lbuf->lb_len);
                GOTO(out, rc = -EINVAL);
        }
 
                GOTO(out, rc = -EINVAL);
        }
 
+       fid = lbuf->lb_buf;
        fid_le_to_cpu(fid, fid);
        if (!fid_is_sane(fid)) {
        fid_le_to_cpu(fid, fid);
        if (!fid_is_sane(fid)) {
-               CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
+               CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
                       dt_dev->dd_lu_dev.ld_obd->obd_name,
                       dt_dev->dd_lu_dev.ld_obd->obd_name,
-                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
-                      rc);
+                      PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
                GOTO(out, rc = -EINVAL);
        }
                GOTO(out, rc = -EINVAL);
        }
+
        memcpy(rec, fid, sizeof(*fid));
        memcpy(rec, fid, sizeof(*fid));
+
+       GOTO(out, rc = 1);
+
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       osp_destroy_update_req(update);
+       out_destroy_update_req(update);
 
 
-       RETURN(rc);
+       return rc;
 }
 
 static int osp_md_declare_insert(const struct lu_env *env,
 }
 
 static int osp_md_declare_insert(const struct lu_env *env,
@@ -811,10 +423,10 @@ static int osp_md_declare_insert(const struct lu_env *env,
        struct lu_fid           *rec_fid = (struct lu_fid *)rec;
        int                     size[2] = {strlen((char *)key) + 1,
                                                  sizeof(*rec_fid)};
        struct lu_fid           *rec_fid = (struct lu_fid *)rec;
        int                     size[2] = {strlen((char *)key) + 1,
                                                  sizeof(*rec_fid)};
-       char                    *bufs[2] = {(char *)key, (char *)rec_fid};
+       const char              *bufs[2] = {(char *)key, (char *)rec_fid};
        int                     rc;
 
        int                     rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -830,7 +442,7 @@ static int osp_md_declare_insert(const struct lu_env *env,
 
        fid_cpu_to_le(rec_fid, rec_fid);
 
 
        fid_cpu_to_le(rec_fid, rec_fid);
 
-       rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
+       rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid,
                               ARRAY_SIZE(size), size, bufs);
        return rc;
 }
                               ARRAY_SIZE(size), size, bufs);
        return rc;
 }
@@ -854,10 +466,9 @@ static int osp_md_declare_delete(const struct lu_env *env,
        struct update_request *update;
        struct lu_fid *fid;
        int size = strlen((char *)key) + 1;
        struct update_request *update;
        struct lu_fid *fid;
        int size = strlen((char *)key) + 1;
-       char *buf = (char *)key;
        int rc;
 
        int rc;
 
-       update = osp_find_create_update_loc(th, dt);
+       update = out_find_create_update_loc(th, dt);
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
        if (IS_ERR(update)) {
                CERROR("%s: Get OSP update buf failed: rc = %d\n",
                       dt->do_lu.lo_dev->ld_obd->obd_name,
@@ -867,8 +478,8 @@ static int osp_md_declare_delete(const struct lu_env *env,
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
 
        fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
 
-       rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
-                              &buf);
+       rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
+                              (const char **)&key);
 
        return rc;
 }
 
        return rc;
 }
@@ -890,7 +501,7 @@ static int osp_md_index_delete(const struct lu_env *env,
  *
  * Note: for OSP, these index iterate api is only used to check
  * whether the directory is empty now (see mdd_dir_is_empty).
  *
  * Note: for OSP, these index iterate api is only used to check
  * whether the directory is empty now (see mdd_dir_is_empty).
- * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
+ * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/
  * out_attr_get). So the implementation of these iterator is simplied
  * to make mdd_dir_is_empty happy. The real iterator should be
  * implemented, if we need it one day.
  * out_attr_get). So the implementation of these iterator is simplied
  * to make mdd_dir_is_empty happy. The real iterator should be
  * implemented, if we need it one day.
@@ -1001,91 +612,6 @@ static int osp_md_index_try(const struct lu_env *env,
        return 0;
 }
 
        return 0;
 }
 
-static int osp_md_attr_get(const struct lu_env *env,
-                          struct dt_object *dt, struct lu_attr *attr,
-                          struct lustre_capa *capa)
-{
-       struct osp_object     *obj = dt2osp_obj(dt);
-       struct dt_device      *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-       struct update_request *update = NULL;
-       struct ptlrpc_request *req = NULL;
-       int rc;
-       ENTRY;
-
-       /* Because it needs send the update buffer right away,
-        * just create an update buffer, instead of attaching the
-        * update_remote list of the thandle.
-        */
-       update = osp_create_update_req(dt_dev);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
-
-       rc = osp_insert_update(env, update, OBJ_ATTR_GET,
-                              (struct lu_fid *)lu_object_fid(&dt->do_lu),
-                              0, NULL, NULL);
-       if (rc) {
-               CERROR("%s: Insert update error: rc = %d\n",
-                      dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
-               GOTO(out, rc);
-       }
-       dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-
-       rc = osp_remote_sync(env, dt_dev, update, &req);
-       if (rc < 0)
-               GOTO(out, rc);
-
-       rc = osp_get_attr_from_req(env, req, attr, 0);
-       if (rc)
-               GOTO(out, rc);
-
-       if (attr->la_flags == 1)
-               obj->opo_empty = 0;
-       else
-               obj->opo_empty = 1;
-out:
-       if (req != NULL)
-               ptlrpc_req_finished(req);
-
-       osp_destroy_update_req(update);
-
-       RETURN(rc);
-}
-
-static int osp_md_declare_object_destroy(const struct lu_env *env,
-                                        struct dt_object *dt,
-                                        struct thandle *th)
-{
-       struct osp_object  *o = dt2osp_obj(dt);
-       int                 rc = 0;
-       ENTRY;
-
-       /*
-        * track objects to be destroyed via llog
-        */
-       rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
-
-       RETURN(rc);
-}
-
-static int osp_md_object_destroy(const struct lu_env *env,
-                                struct dt_object *dt, struct thandle *th)
-{
-       struct osp_object  *o = dt2osp_obj(dt);
-       int                 rc = 0;
-       ENTRY;
-
-       /*
-        * once transaction is committed put proper command on
-        * the queue going to our OST
-        */
-       rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
-
-       /* not needed in cache any more */
-       set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
-
-       RETURN(rc);
-}
-
 static int osp_md_object_lock(const struct lu_env *env,
                              struct dt_object *dt,
                              struct lustre_handle *lh,
 static int osp_md_object_lock(const struct lu_env *env,
                              struct dt_object *dt,
                              struct lustre_handle *lh,
@@ -1136,15 +662,15 @@ struct dt_object_operations osp_md_obj_ops = {
        .do_ref_add           = osp_md_object_ref_add,
        .do_declare_ref_del   = osp_md_declare_object_ref_del,
        .do_ref_del           = osp_md_object_ref_del,
        .do_ref_add           = osp_md_object_ref_add,
        .do_declare_ref_del   = osp_md_declare_object_ref_del,
        .do_ref_del           = osp_md_object_ref_del,
-       .do_declare_destroy   = osp_md_declare_object_destroy,
-       .do_destroy           = osp_md_object_destroy,
+       .do_declare_destroy   = osp_declare_object_destroy,
+       .do_destroy           = osp_object_destroy,
        .do_ah_init           = osp_md_ah_init,
        .do_ah_init           = osp_md_ah_init,
-       .do_attr_get          = osp_md_attr_get,
+       .do_attr_get          = osp_attr_get,
        .do_declare_attr_set  = osp_md_declare_attr_set,
        .do_attr_set          = osp_md_attr_set,
        .do_declare_attr_set  = osp_md_declare_attr_set,
        .do_attr_set          = osp_md_attr_set,
-       .do_declare_xattr_set = osp_md_declare_xattr_set,
-       .do_xattr_set         = osp_md_xattr_set,
-       .do_xattr_get         = osp_md_xattr_get,
+       .do_xattr_get         = osp_xattr_get,
+       .do_declare_xattr_set = osp_declare_xattr_set,
+       .do_xattr_set         = osp_xattr_set,
        .do_index_try         = osp_md_index_try,
        .do_object_lock       = osp_md_object_lock,
 };
        .do_index_try         = osp_md_index_try,
        .do_object_lock       = osp_md_object_lock,
 };
index f474f43..7d302c9 100644 (file)
 
 #include "osp_internal.h"
 
 
 #include "osp_internal.h"
 
+static inline bool is_ost_obj(struct lu_object *lo)
+{
+       return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
+}
+
 static void osp_object_assign_fid(const struct lu_env *env,
                                 struct osp_device *d, struct osp_object *o)
 {
 static void osp_object_assign_fid(const struct lu_env *env,
                                 struct osp_device *d, struct osp_object *o)
 {
@@ -59,11 +64,329 @@ static void osp_object_assign_fid(const struct lu_env *env,
        lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
 }
 
        lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
 }
 
+static int osp_oac_init(struct osp_object *obj)
+{
+       struct osp_object_attr *ooa;
+
+       OBD_ALLOC_PTR(ooa);
+       if (ooa == NULL)
+               return -ENOMEM;
+
+       INIT_LIST_HEAD(&ooa->ooa_xattr_list);
+       spin_lock(&obj->opo_lock);
+       if (likely(obj->opo_ooa == NULL)) {
+               obj->opo_ooa = ooa;
+               spin_unlock(&obj->opo_lock);
+       } else {
+               spin_unlock(&obj->opo_lock);
+               OBD_FREE_PTR(ooa);
+       }
+
+       return 0;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
+                         const char *name, int namelen, bool unlink)
+{
+       struct osp_xattr_entry *oxe;
+
+       list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
+               if (namelen == oxe->oxe_namelen &&
+                   strncmp(name, oxe->oxe_buf, namelen) == 0) {
+                       if (unlink)
+                               list_del_init(&oxe->oxe_list);
+                       else
+                               atomic_inc(&oxe->oxe_ref);
+
+                       return oxe;
+               }
+       }
+
+       return NULL;
+}
+
+static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
+                                                 const char *name)
+{
+       struct osp_xattr_entry *oxe = NULL;
+
+       spin_lock(&obj->opo_lock);
+       if (obj->opo_ooa != NULL)
+               oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
+                                               strlen(name), false);
+       spin_unlock(&obj->opo_lock);
+
+       return oxe;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
+{
+       struct osp_object_attr *ooa     = obj->opo_ooa;
+       struct osp_xattr_entry *oxe;
+       struct osp_xattr_entry *tmp     = NULL;
+       int                     namelen = strlen(name);
+       int                     size    = sizeof(*oxe) + namelen + 1 + len;
+
+       LASSERT(ooa != NULL);
+
+       oxe = osp_oac_xattr_find(obj, name);
+       if (oxe != NULL)
+               return oxe;
+
+       OBD_ALLOC(oxe, size);
+       if (unlikely(oxe == NULL))
+               return NULL;
+
+       INIT_LIST_HEAD(&oxe->oxe_list);
+       oxe->oxe_buflen = size;
+       oxe->oxe_namelen = namelen;
+       memcpy(oxe->oxe_buf, name, namelen);
+       oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+       /* One ref is for the caller, the other is for the entry on the list. */
+       atomic_set(&oxe->oxe_ref, 2);
+
+       spin_lock(&obj->opo_lock);
+       tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+       if (tmp == NULL)
+               list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       spin_unlock(&obj->opo_lock);
+
+       if (tmp != NULL) {
+               OBD_FREE(oxe, size);
+               oxe = tmp;
+       }
+
+       return oxe;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_replace(struct osp_object *obj,
+                     struct osp_xattr_entry **poxe, int len)
+{
+       struct osp_object_attr *ooa     = obj->opo_ooa;
+       struct osp_xattr_entry *old     = *poxe;
+       struct osp_xattr_entry *oxe;
+       struct osp_xattr_entry *tmp     = NULL;
+       int                     namelen = old->oxe_namelen;
+       int                     size    = sizeof(*oxe) + namelen + 1 + len;
+
+       LASSERT(ooa != NULL);
+
+       OBD_ALLOC(oxe, size);
+       if (unlikely(oxe == NULL))
+               return NULL;
+
+       INIT_LIST_HEAD(&oxe->oxe_list);
+       oxe->oxe_buflen = size;
+       oxe->oxe_namelen = namelen;
+       memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+       oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+       /* One ref is for the caller, the other is for the entry on the list. */
+       atomic_set(&oxe->oxe_ref, 2);
+
+       spin_lock(&obj->opo_lock);
+       tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+       list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+       spin_unlock(&obj->opo_lock);
+
+       *poxe = tmp;
+       LASSERT(tmp != NULL);
+
+       return oxe;
+}
+
+static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
+{
+       if (atomic_dec_and_test(&oxe->oxe_ref)) {
+               LASSERT(list_empty(&oxe->oxe_list));
+
+               OBD_FREE(oxe, oxe->oxe_buflen);
+       }
+}
+
+static int osp_get_attr_from_reply(const struct lu_env *env,
+                                  struct update_reply *reply,
+                                  struct lu_attr *attr,
+                                  struct osp_object *obj, int index)
+{
+       struct osp_thread_info  *osi    = osp_env_info(env);
+       struct lu_buf           *rbuf   = &osi->osi_lb2;
+       struct obdo             *lobdo  = &osi->osi_obdo;
+       struct obdo             *wobdo;
+       int                      rc;
+
+       rc = update_get_reply_buf(reply, rbuf, index);
+       if (rc < 0)
+               return rc;
+
+       wobdo = rbuf->lb_buf;
+       if (rbuf->lb_len != sizeof(*wobdo))
+               return -EPROTO;
+
+       obdo_le_to_cpu(wobdo, wobdo);
+       lustre_get_wire_obdo(NULL, lobdo, wobdo);
+       spin_lock(&obj->opo_lock);
+       if (obj->opo_ooa != NULL) {
+               la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid);
+               if (attr != NULL)
+                       *attr = obj->opo_ooa->ooa_attr;
+       } else {
+               LASSERT(attr != NULL);
+
+               la_from_obdo(attr, lobdo, lobdo->o_valid);
+       }
+       spin_unlock(&obj->opo_lock);
+
+       return 0;
+}
+
+static int osp_attr_get_interpterer(const struct lu_env *env,
+                                   struct update_reply *reply,
+                                   struct osp_object *obj,
+                                   void *data, int index, int rc)
+{
+       struct lu_attr *attr = data;
+
+       LASSERT(obj->opo_ooa != NULL);
+
+       if (rc == 0) {
+               osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+               obj->opo_non_exist = 0;
+
+               return osp_get_attr_from_reply(env, reply, NULL, obj, index);
+       } else {
+               if (rc == -ENOENT) {
+                       osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+                       obj->opo_non_exist = 1;
+               }
+
+               spin_lock(&obj->opo_lock);
+               attr->la_valid = 0;
+               spin_unlock(&obj->opo_lock);
+       }
+
+       return 0;
+}
+
+static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt,
+                               struct lustre_capa *capa)
+{
+       struct osp_object       *obj    = dt2osp_obj(dt);
+       struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
+       struct update_request   *update;
+       int                      rc     = 0;
+
+       if (obj->opo_ooa == NULL) {
+               rc = osp_oac_init(obj);
+               if (rc != 0)
+                       return rc;
+       }
+
+       mutex_lock(&osp->opd_async_requests_mutex);
+       update = osp_find_or_create_async_update_request(osp);
+       if (IS_ERR(update))
+               rc = PTR_ERR(update);
+       else
+               rc = osp_insert_async_update(env, update, OBJ_ATTR_GET, obj, 0,
+                                            NULL, NULL,
+                                            &obj->opo_ooa->ooa_attr,
+                                            osp_attr_get_interpterer);
+       mutex_unlock(&osp->opd_async_requests_mutex);
+
+       return rc;
+}
+
+int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
+                struct lu_attr *attr, struct lustre_capa *capa)
+{
+       struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
+       struct osp_object       *obj    = dt2osp_obj(dt);
+       struct dt_device        *dev    = &osp->opd_dt_dev;
+       struct update_request   *update;
+       struct update_reply     *reply;
+       struct ptlrpc_request   *req    = NULL;
+       int                      rc     = 0;
+       ENTRY;
+
+       if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
+               RETURN(-ENOENT);
+
+       if (obj->opo_ooa != NULL) {
+               spin_lock(&obj->opo_lock);
+               if (obj->opo_ooa->ooa_attr.la_valid != 0) {
+                       *attr = obj->opo_ooa->ooa_attr;
+                       spin_unlock(&obj->opo_lock);
+
+                       RETURN(0);
+               }
+               spin_unlock(&obj->opo_lock);
+       }
+
+       update = out_create_update_req(dev);
+       if (IS_ERR(update))
+               RETURN(PTR_ERR(update));
+
+       rc = out_insert_update(env, update, OBJ_ATTR_GET,
+                              lu_object_fid(&dt->do_lu), 0, NULL, NULL);
+       if (rc != 0) {
+               CERROR("%s: Insert update error "DFID": rc = %d\n",
+                      dev->dd_lu_dev.ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)), rc);
+
+               GOTO(out, rc);
+       }
+
+       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+       if (rc != 0) {
+               if (rc == -ENOENT) {
+                       osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+                       obj->opo_non_exist = 1;
+               } else {
+                       CERROR("%s:osp_attr_get update error "DFID": rc = %d\n",
+                              dev->dd_lu_dev.ld_obd->obd_name,
+                              PFID(lu_object_fid(&dt->do_lu)), rc);
+               }
+
+               GOTO(out, rc);
+       }
+
+       osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+       obj->opo_non_exist = 0;
+       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+                                            UPDATE_BUFFER_SIZE);
+       if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+               GOTO(out, rc = -EPROTO);
+
+       rc = osp_get_attr_from_reply(env, reply, attr, obj, 0);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       if (!is_ost_obj(&dt->do_lu)) {
+               if (attr->la_flags == 1)
+                       obj->opo_empty = 0;
+               else
+                       obj->opo_empty = 1;
+       }
+
+       GOTO(out, rc = 0);
+
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       out_destroy_update_req(update);
+
+       return rc;
+}
+
 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                                const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
 static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
                                const struct lu_attr *attr, struct thandle *th)
 {
        struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
        struct osp_object       *o = dt2osp_obj(dt);
+       struct lu_attr          *la;
        int                      rc = 0;
 
        ENTRY;
        int                      rc = 0;
 
        ENTRY;
@@ -112,8 +435,23 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
         * track all UID/GID changes via llog
         */
        rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
         * track all UID/GID changes via llog
         */
        rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
+       if (rc != 0 || o->opo_ooa == NULL)
+               RETURN(rc);
 
 
-       RETURN(rc);
+       la = &o->opo_ooa->ooa_attr;
+       spin_lock(&o->opo_lock);
+       if (attr->la_valid & LA_UID) {
+               la->la_uid = attr->la_uid;
+               la->la_valid |= LA_UID;
+       }
+
+       if (attr->la_valid & LA_GID) {
+               la->la_gid = attr->la_gid;
+               la->la_valid |= LA_GID;
+       }
+       spin_unlock(&o->opo_lock);
+
+       RETURN(0);
 }
 
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
 }
 
 static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
@@ -149,6 +487,373 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
+static int osp_xattr_get_interpterer(const struct lu_env *env,
+                                    struct update_reply *reply,
+                                    struct osp_object *obj,
+                                    void *data, int index, int rc)
+{
+       struct osp_object_attr  *ooa  = obj->opo_ooa;
+       struct osp_xattr_entry  *oxe  = data;
+       struct lu_buf           *rbuf = &osp_env_info(env)->osi_lb2;
+
+       LASSERT(ooa != NULL);
+
+       if (rc == 0) {
+               int len = sizeof(*oxe) + oxe->oxe_namelen + 1;
+
+               rc = update_get_reply_buf(reply, rbuf, index);
+               if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
+                       spin_lock(&obj->opo_lock);
+                       oxe->oxe_ready = 0;
+                       spin_unlock(&obj->opo_lock);
+                       osp_oac_xattr_put(oxe);
+
+                       return rc < 0 ? rc : -ERANGE;
+               }
+
+               spin_lock(&obj->opo_lock);
+               oxe->oxe_vallen = rbuf->lb_len;
+               memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+               oxe->oxe_exist = 1;
+               oxe->oxe_ready = 1;
+               spin_unlock(&obj->opo_lock);
+       } else if (rc == -ENOENT || rc == -ENODATA) {
+               spin_lock(&obj->opo_lock);
+               oxe->oxe_exist = 0;
+               oxe->oxe_ready = 1;
+               spin_unlock(&obj->opo_lock);
+       } else {
+               spin_lock(&obj->opo_lock);
+               oxe->oxe_ready = 0;
+               spin_unlock(&obj->opo_lock);
+       }
+
+       osp_oac_xattr_put(oxe);
+
+       return 0;
+}
+
+static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
+                                struct lu_buf *buf, const char *name,
+                                struct lustre_capa *capa)
+{
+       struct osp_object       *obj     = dt2osp_obj(dt);
+       struct osp_device       *osp     = lu2osp_dev(dt->do_lu.lo_dev);
+       struct update_request   *update;
+       struct osp_xattr_entry  *oxe;
+       int                      namelen = strlen(name);
+       int                      rc      = 0;
+
+       LASSERT(buf != NULL);
+       LASSERT(name != NULL);
+
+       /* If only for xattr size, return directly. */
+       if (unlikely(buf->lb_len == 0))
+               return 0;
+
+       if (obj->opo_ooa == NULL) {
+               rc = osp_oac_init(obj);
+               if (rc != 0)
+                       return rc;
+       }
+
+       oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+       if (oxe == NULL)
+               return -ENOMEM;
+
+       mutex_lock(&osp->opd_async_requests_mutex);
+       update = osp_find_or_create_async_update_request(osp);
+       if (IS_ERR(update)) {
+               rc = PTR_ERR(update);
+               mutex_unlock(&osp->opd_async_requests_mutex);
+               osp_oac_xattr_put(oxe);
+       } else {
+               rc = osp_insert_async_update(env, update, OBJ_XATTR_GET, obj,
+                                            1, &namelen, &name, oxe,
+                                            osp_xattr_get_interpterer);
+               if (rc != 0) {
+                       mutex_unlock(&osp->opd_async_requests_mutex);
+                       osp_oac_xattr_put(oxe);
+               } else {
+                       /* XXX: Currently, we trigger the batched async OUT
+                        *      RPC via dt_declare_xattr_get(). It is not
+                        *      perfect solution, but works well now.
+                        *
+                        *      We will improve it in the future. */
+                       update = osp->opd_async_requests;
+                       if (update != NULL && update->ur_buf != NULL &&
+                           update->ur_buf->ub_count > 0) {
+                               osp->opd_async_requests = NULL;
+                               mutex_unlock(&osp->opd_async_requests_mutex);
+                               rc = osp_unplug_async_update(env, osp, update);
+                       } else {
+                               mutex_unlock(&osp->opd_async_requests_mutex);
+                       }
+               }
+       }
+
+       return rc;
+}
+
+int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
+                 struct lu_buf *buf, const char *name,
+                 struct lustre_capa *capa)
+{
+       struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
+       struct osp_object       *obj    = dt2osp_obj(dt);
+       struct dt_device        *dev    = &osp->opd_dt_dev;
+       struct lu_buf           *rbuf   = &osp_env_info(env)->osi_lb2;
+       struct update_request   *update = NULL;
+       struct ptlrpc_request   *req    = NULL;
+       struct update_reply     *reply;
+       struct osp_xattr_entry  *oxe    = NULL;
+       const char              *dname  = dt->do_lu.lo_dev->ld_obd->obd_name;
+       int                      namelen;
+       int                      rc     = 0;
+       ENTRY;
+
+       LASSERT(buf != NULL);
+       LASSERT(name != NULL);
+
+       if (unlikely(obj->opo_non_exist))
+               RETURN(-ENOENT);
+
+       oxe = osp_oac_xattr_find(obj, name);
+       if (oxe != NULL) {
+               spin_lock(&obj->opo_lock);
+               if (oxe->oxe_ready) {
+                       if (!oxe->oxe_exist)
+                               GOTO(unlock, rc = -ENODATA);
+
+                       if (buf->lb_buf == NULL)
+                               GOTO(unlock, rc = oxe->oxe_vallen);
+
+                       if (buf->lb_len < oxe->oxe_vallen)
+                               GOTO(unlock, rc = -ERANGE);
+
+                       memcpy(buf->lb_buf, oxe->oxe_value, oxe->oxe_vallen);
+
+                       GOTO(unlock, rc = oxe->oxe_vallen);
+
+unlock:
+                       spin_unlock(&obj->opo_lock);
+                       osp_oac_xattr_put(oxe);
+
+                       return rc;
+               }
+               spin_unlock(&obj->opo_lock);
+       }
+
+       update = out_create_update_req(dev);
+       if (IS_ERR(update))
+               GOTO(out, rc = PTR_ERR(update));
+
+       namelen = strlen(name);
+       rc = out_insert_update(env, update, OBJ_XATTR_GET,
+                              lu_object_fid(&dt->do_lu), 1, &namelen, &name);
+       if (rc != 0) {
+               CERROR("%s: Insert update error "DFID": rc = %d\n",
+                      dname, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+               GOTO(out, rc);
+       }
+
+       rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+       if (rc != 0) {
+               if (obj->opo_ooa == NULL)
+                       GOTO(out, rc);
+
+               if (oxe == NULL)
+                       oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+
+               if (oxe == NULL) {
+                       CWARN("%s: Fail to add xattr (%s) to cache for "
+                             DFID" (1): rc = %d\n", dname, name,
+                             PFID(lu_object_fid(&dt->do_lu)), rc);
+
+                       GOTO(out, rc);
+               }
+
+               spin_lock(&obj->opo_lock);
+               if (rc == -ENOENT || rc == -ENODATA) {
+                       oxe->oxe_exist = 0;
+                       oxe->oxe_ready = 1;
+               } else {
+                       oxe->oxe_ready = 0;
+               }
+               spin_unlock(&obj->opo_lock);
+
+               GOTO(out, rc);
+       }
+
+       reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+                                           UPDATE_BUFFER_SIZE);
+       if (reply->ur_version != UPDATE_REPLY_V1) {
+               CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
+                      dname, reply->ur_version, UPDATE_REPLY_V1,
+                      PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
+
+               GOTO(out, rc = -EPROTO);
+       }
+
+       rc = update_get_reply_buf(reply, rbuf, 0);
+       if (rc < 0)
+               GOTO(out, rc);
+
+       LASSERT(rbuf->lb_len > 0 && rbuf->lb_len < PAGE_CACHE_SIZE);
+
+       if (buf->lb_buf == NULL)
+               GOTO(out, rc = rbuf->lb_len);
+
+       if (unlikely(buf->lb_len < rbuf->lb_len))
+               GOTO(out, rc = -ERANGE);
+
+       memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
+       rc = rbuf->lb_len;
+       if (obj->opo_ooa == NULL)
+               GOTO(out, rc);
+
+       if (oxe == NULL) {
+               oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
+               if (oxe == NULL) {
+                       CWARN("%s: Fail to add xattr (%s) to "
+                             "cache for "DFID" (2): rc = %d\n",
+                             dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+                       GOTO(out, rc);
+               }
+       }
+
+       if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < rbuf->lb_len) {
+               struct osp_xattr_entry *old = oxe;
+               struct osp_xattr_entry *tmp;
+
+               tmp = osp_oac_xattr_replace(obj, &old, rbuf->lb_len);
+               osp_oac_xattr_put(oxe);
+               oxe = tmp;
+               if (tmp == NULL) {
+                       CWARN("%s: Fail to update xattr (%s) to "
+                             "cache for "DFID": rc = %d\n",
+                             dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+                       spin_lock(&obj->opo_lock);
+                       oxe->oxe_ready = 0;
+                       spin_unlock(&obj->opo_lock);
+
+                       GOTO(out, rc);
+               }
+
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(old);
+       }
+
+       spin_lock(&obj->opo_lock);
+       oxe->oxe_vallen = rbuf->lb_len;
+       memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+       oxe->oxe_exist = 1;
+       oxe->oxe_ready = 1;
+       spin_unlock(&obj->opo_lock);
+
+       GOTO(out, rc);
+
+out:
+       if (req != NULL)
+               ptlrpc_req_finished(req);
+
+       if (update != NULL && !IS_ERR(update))
+               out_destroy_update_req(update);
+
+       if (oxe != NULL)
+               osp_oac_xattr_put(oxe);
+
+       return rc;
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                         const struct lu_buf *buf, const char *name,
+                         int flag, struct thandle *th)
+{
+       struct osp_object       *o       = dt2osp_obj(dt);
+       struct update_request   *update;
+       struct lu_fid           *fid;
+       struct osp_xattr_entry  *oxe;
+       int                     sizes[3] = {strlen(name), buf->lb_len,
+                                           sizeof(int)};
+       char                    *bufs[3] = {(char *)name, (char *)buf->lb_buf };
+       int                     rc;
+
+       LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
+
+       update = out_find_create_update_loc(th, dt);
+       if (IS_ERR(update)) {
+               CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n",
+                      dt->do_lu.lo_dev->ld_obd->obd_name,
+                      PFID(lu_object_fid(&dt->do_lu)),
+                      (int)PTR_ERR(update));
+
+               return PTR_ERR(update);
+       }
+
+       flag = cpu_to_le32(flag);
+       bufs[2] = (char *)&flag;
+
+       fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+       rc = out_insert_update(env, update, OBJ_XATTR_SET, fid,
+                              ARRAY_SIZE(sizes), sizes, (const char **)bufs);
+       if (rc != 0 || o->opo_ooa == NULL)
+               return rc;
+
+       oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
+       if (oxe == NULL) {
+               CWARN("%s: Fail to add xattr (%s) to cache for "DFID
+                     ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+                     name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+               return 0;
+       }
+
+       if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
+               struct osp_xattr_entry *old = oxe;
+               struct osp_xattr_entry *tmp;
+
+               tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
+               osp_oac_xattr_put(oxe);
+               oxe = tmp;
+               if (tmp == NULL) {
+                       CWARN("%s: Fail to update xattr (%s) to cache for "DFID
+                             ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+                             name, PFID(lu_object_fid(&dt->do_lu)), rc);
+                       spin_lock(&o->opo_lock);
+                       oxe->oxe_ready = 0;
+                       spin_unlock(&o->opo_lock);
+
+                       return 0;
+               }
+
+               /* Drop the ref for entry on list. */
+               osp_oac_xattr_put(old);
+       }
+
+       spin_lock(&o->opo_lock);
+       oxe->oxe_vallen = buf->lb_len;
+       memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
+       oxe->oxe_exist = 1;
+       oxe->oxe_ready = 1;
+       spin_unlock(&o->opo_lock);
+       osp_oac_xattr_put(oxe);
+
+       return 0;
+}
+
+int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+                 const struct lu_buf *buf, const char *name, int fl,
+                 struct thandle *th, struct lustre_capa *capa)
+{
+       CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
+              PFID(&dt->do_lu.lo_header->loh_fid));
+
+       return 0;
+}
+
 static int osp_declare_object_create(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct lu_attr *attr,
 static int osp_declare_object_create(const struct lu_env *env,
                                     struct dt_object *dt,
                                     struct lu_attr *attr,
@@ -231,6 +936,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        struct lu_fid           *fid = &osi->osi_fid;
        ENTRY;
 
        struct lu_fid           *fid = &osi->osi_fid;
        ENTRY;
 
+       o->opo_non_exist = 0;
        if (o->opo_reserved) {
                /* regular case, fid is assigned holding trunsaction open */
                 osp_object_assign_fid(env, d, o);
        if (o->opo_reserved) {
                /* regular case, fid is assigned holding trunsaction open */
                 osp_object_assign_fid(env, d, o);
@@ -305,8 +1011,8 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int osp_declare_object_destroy(const struct lu_env *env,
-                                     struct dt_object *dt, struct thandle *th)
+int osp_declare_object_destroy(const struct lu_env *env,
+                              struct dt_object *dt, struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
@@ -321,14 +1027,15 @@ static int osp_declare_object_destroy(const struct lu_env *env,
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
-static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
-                             struct thandle *th)
+int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
+                      struct thandle *th)
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
 
        ENTRY;
 
 {
        struct osp_object       *o = dt2osp_obj(dt);
        int                      rc = 0;
 
        ENTRY;
 
+       o->opo_non_exist = 1;
        /*
         * once transaction is committed put proper command on
         * the queue going to our OST
        /*
         * once transaction is committed put proper command on
         * the queue going to our OST
@@ -342,21 +1049,20 @@ static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
 }
 
 struct dt_object_operations osp_obj_ops = {
 }
 
 struct dt_object_operations osp_obj_ops = {
+       .do_declare_attr_get    = osp_declare_attr_get,
+       .do_attr_get            = osp_attr_get,
        .do_declare_attr_set    = osp_declare_attr_set,
        .do_attr_set            = osp_attr_set,
        .do_declare_attr_set    = osp_declare_attr_set,
        .do_attr_set            = osp_attr_set,
+       .do_declare_xattr_get   = osp_declare_xattr_get,
+       .do_xattr_get           = osp_xattr_get,
+       .do_declare_xattr_set   = osp_declare_xattr_set,
+       .do_xattr_set           = osp_xattr_set,
        .do_declare_create      = osp_declare_object_create,
        .do_create              = osp_object_create,
        .do_declare_destroy     = osp_declare_object_destroy,
        .do_destroy             = osp_object_destroy,
 };
 
        .do_declare_create      = osp_declare_object_create,
        .do_create              = osp_object_create,
        .do_declare_destroy     = osp_declare_object_destroy,
        .do_destroy             = osp_object_destroy,
 };
 
-static int is_ost_obj(struct lu_object *lo)
-{
-       struct osp_device  *osp  = lu2osp_dev(lo->lo_dev);
-
-       return !osp->opd_connect_mdt;
-}
-
 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
                           const struct lu_object_conf *conf)
 {
 static int osp_object_init(const struct lu_env *env, struct lu_object *o,
                           const struct lu_object_conf *conf)
 {
@@ -364,20 +1070,24 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
        int                     rc = 0;
        ENTRY;
 
        int                     rc = 0;
        ENTRY;
 
+       spin_lock_init(&po->opo_lock);
+       o->lo_header->loh_attr |= LOHA_REMOTE;
+
        if (is_ost_obj(o)) {
                po->opo_obj.do_ops = &osp_obj_ops;
        } else {
        if (is_ost_obj(o)) {
                po->opo_obj.do_ops = &osp_obj_ops;
        } else {
-               struct lu_attr          *la = &osp_env_info(env)->osi_attr;
+               struct lu_attr *la = &osp_env_info(env)->osi_attr;
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
 
                po->opo_obj.do_ops = &osp_md_obj_ops;
-               o->lo_header->loh_attr |= LOHA_REMOTE;
                rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
                                                     la, NULL);
                if (rc == 0)
                        o->lo_header->loh_attr |=
                                LOHA_EXISTS | (la->la_mode & S_IFMT);
                rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
                                                     la, NULL);
                if (rc == 0)
                        o->lo_header->loh_attr |=
                                LOHA_EXISTS | (la->la_mode & S_IFMT);
-               if (rc == -ENOENT)
+               if (rc == -ENOENT) {
+                       po->opo_non_exist = 1;
                        rc = 0;
                        rc = 0;
+               }
        }
        RETURN(rc);
 }
        }
        RETURN(rc);
 }
@@ -389,6 +1099,24 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o)
 
        dt_object_fini(&obj->opo_obj);
        lu_object_header_fini(h);
 
        dt_object_fini(&obj->opo_obj);
        lu_object_header_fini(h);
+       if (obj->opo_ooa != NULL) {
+               struct osp_xattr_entry *oxe;
+               struct osp_xattr_entry *tmp;
+               int                     count;
+
+               list_for_each_entry_safe(oxe, tmp,
+                                        &obj->opo_ooa->ooa_xattr_list,
+                                        oxe_list) {
+                       list_del(&oxe->oxe_list);
+                       count = atomic_read(&oxe->oxe_ref);
+                       LASSERTF(count == 1,
+                                "Still has %d users on the xattr entry %.*s\n",
+                                count - 1, oxe->oxe_namelen, oxe->oxe_buf);
+
+                       OBD_FREE(oxe, oxe->oxe_buflen);
+               }
+               OBD_FREE_PTR(obj->opo_ooa);
+       }
        OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
 }
 
        OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
 }
 
diff --git a/lustre/osp/osp_trans.c b/lustre/osp/osp_trans.c
new file mode 100644 (file)
index 0000000..b819572
--- /dev/null
@@ -0,0 +1,314 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/osp/osp_trans.c
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "osp_internal.h"
+
+struct osp_async_update_args {
+       struct update_request   *oaua_update;
+};
+
+struct osp_async_update_item {
+       struct list_head                 oaui_list;
+       struct osp_object               *oaui_obj;
+       void                            *oaui_data;
+       osp_async_update_interpterer_t   oaui_interpterer;
+};
+
+static struct osp_async_update_item *
+osp_async_update_item_init(struct osp_object *obj, void *data,
+                          osp_async_update_interpterer_t interpterer)
+{
+       struct osp_async_update_item *oaui;
+
+       OBD_ALLOC_PTR(oaui);
+       if (oaui == NULL)
+               return NULL;
+
+       lu_object_get(osp2lu_obj(obj));
+       INIT_LIST_HEAD(&oaui->oaui_list);
+       oaui->oaui_obj = obj;
+       oaui->oaui_data = data;
+       oaui->oaui_interpterer = interpterer;
+
+       return oaui;
+}
+
+static void osp_async_update_item_fini(const struct lu_env *env,
+                                      struct osp_async_update_item *oaui)
+{
+       LASSERT(list_empty(&oaui->oaui_list));
+
+       lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
+       OBD_FREE_PTR(oaui);
+}
+
+static int osp_async_update_interpret(const struct lu_env *env,
+                                     struct ptlrpc_request *req,
+                                     void *arg, int rc)
+{
+       struct update_reply             *reply  = NULL;
+       struct osp_async_update_args    *oaua   = arg;
+       struct update_request           *update = oaua->oaua_update;
+       struct osp_async_update_item    *oaui;
+       struct osp_async_update_item    *next;
+       int                              count  = 0;
+       int                              index  = 0;
+       int                              rc1    = 0;
+
+       if (rc == 0 || req->rq_repmsg != NULL) {
+               reply = req_capsule_server_sized_get(&req->rq_pill,
+                                                    &RMF_UPDATE_REPLY,
+                                                    UPDATE_BUFFER_SIZE);
+               if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+                       rc1 = -EPROTO;
+               else
+                       count = reply->ur_count;
+       } else {
+               rc1 = rc;
+       }
+
+       list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
+               list_del_init(&oaui->oaui_list);
+               if (index < count && reply->ur_lens[index] > 0) {
+                       char *ptr = update_get_buf_internal(reply, index, NULL);
+
+                       LASSERT(ptr != NULL);
+
+                       rc1 = le32_to_cpu(*(int *)ptr);
+               } else {
+                       rc1 = rc;
+                       if (unlikely(rc1 == 0))
+                               rc1 = -EINVAL;
+               }
+
+               oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
+                                      oaui->oaui_data, index, rc1);
+               osp_async_update_item_fini(env, oaui);
+               index++;
+       }
+
+       out_destroy_update_req(update);
+
+       return 0;
+}
+
+int osp_unplug_async_update(const struct lu_env *env,
+                           struct osp_device *osp,
+                           struct update_request *update)
+{
+       struct osp_async_update_args    *args;
+       struct ptlrpc_request           *req = NULL;
+       int                              rc;
+
+       rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+                                update->ur_buf, UPDATE_BUFFER_SIZE, &req);
+       if (rc != 0) {
+               struct osp_async_update_item *oaui;
+               struct osp_async_update_item *next;
+
+               list_for_each_entry_safe(oaui, next,
+                                        &update->ur_cb_items, oaui_list) {
+                       list_del_init(&oaui->oaui_list);
+                       oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
+                                              oaui->oaui_data, 0, rc);
+                       osp_async_update_item_fini(env, oaui);
+               }
+               out_destroy_update_req(update);
+       } else {
+               LASSERT(list_empty(&update->ur_list));
+
+               args = ptlrpc_req_async_args(req);
+               args->oaua_update = update;
+               req->rq_interpret_reply = osp_async_update_interpret;
+               ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+       }
+
+       return rc;
+}
+
+/* with osp::opd_async_requests_mutex held */
+struct update_request *
+osp_find_or_create_async_update_request(struct osp_device *osp)
+{
+       struct update_request *update = osp->opd_async_requests;
+
+       if (update != NULL)
+               return update;
+
+       update = out_create_update_req(&osp->opd_dt_dev);
+       if (!IS_ERR(update))
+               osp->opd_async_requests = update;
+
+       return update;
+}
+
+/* with osp::opd_async_requests_mutex held */
+int osp_insert_async_update(const struct lu_env *env,
+                           struct update_request *update, int op,
+                           struct osp_object *obj, int count,
+                           int *lens, const char **bufs, void *data,
+                           osp_async_update_interpterer_t interpterer)
+{
+       struct osp_async_update_item *oaui;
+       struct osp_device            *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
+       int                           rc  = 0;
+       ENTRY;
+
+       oaui = osp_async_update_item_init(obj, data, interpterer);
+       if (oaui == NULL)
+               RETURN(-ENOMEM);
+
+again:
+       rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
+                              count, lens, bufs);
+       if (rc == -E2BIG) {
+               osp->opd_async_requests = NULL;
+               mutex_unlock(&osp->opd_async_requests_mutex);
+
+               rc = osp_unplug_async_update(env, osp, update);
+               mutex_lock(&osp->opd_async_requests_mutex);
+               if (rc != 0)
+                       GOTO(out, rc);
+
+               update = osp_find_or_create_async_update_request(osp);
+               if (IS_ERR(update))
+                       GOTO(out, rc = PTR_ERR(update));
+
+               goto again;
+       }
+
+       if (rc == 0)
+               list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
+
+       GOTO(out, rc);
+
+out:
+       if (rc != 0)
+               osp_async_update_item_fini(env, oaui);
+
+       return rc;
+}
+
+struct thandle *osp_trans_create(const struct lu_env *env,
+                                struct dt_device *d)
+{
+       struct thandle *th;
+
+       OBD_ALLOC_PTR(th);
+       if (unlikely(th == NULL))
+               return ERR_PTR(-ENOMEM);
+
+       th->th_dev = d;
+       th->th_tags = LCT_TX_HANDLE;
+       INIT_LIST_HEAD(&th->th_remote_update_list);
+
+       return th;
+}
+
+static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
+                            struct thandle *th)
+{
+       struct update_request   *update = th->th_current_request;
+       int                      rc     = 0;
+
+       if (unlikely(update == NULL || update->ur_buf == NULL ||
+                    update->ur_buf->ub_count == 0))
+               return 0;
+
+       if (is_remote_trans(th)) {
+               struct osp_async_update_args    *args;
+               struct ptlrpc_request           *req;
+
+               list_del_init(&update->ur_list);
+               th->th_current_request = NULL;
+               rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+                                        update->ur_buf,
+                                        UPDATE_BUFFER_SIZE, &req);
+               if (rc == 0) {
+                       args = ptlrpc_req_async_args(req);
+                       args->oaua_update = update;
+                       req->rq_interpret_reply =
+                               osp_async_update_interpret;
+                       ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+               } else {
+                       out_destroy_update_req(update);
+               }
+       } else {
+               th->th_sync = 1;
+               rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
+                                    update, NULL);
+       }
+
+       return rc;
+}
+
+int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
+                   struct thandle *th)
+{
+       int rc = 0;
+
+       if (!is_remote_trans(th))
+               rc = osp_trans_trigger(env, dt2osp_dev(dt), th);
+
+       return rc;
+}
+
+int osp_trans_stop(const struct lu_env *env, struct thandle *th)
+{
+       struct update_request   *update = th->th_current_request;
+       int                      rc     = 0;
+
+       if (is_remote_trans(th)) {
+               LASSERT(update == NULL);
+
+               update = out_find_update(th, th->th_dev);
+               th->th_current_request = update;
+               if (th->th_result == 0)
+                       rc = osp_trans_trigger(env, dt2osp_dev(th->th_dev), th);
+               else
+                       rc = th->th_result;
+
+               if (th->th_current_request != NULL)
+                       out_destroy_update_req(update);
+
+               OBD_FREE_PTR(th);
+       } else {
+               LASSERT(update != NULL);
+
+               rc = update->ur_rc;
+               out_destroy_update_req(update);
+               th->th_current_request = NULL;
+       }
+
+       return rc;
+}
index ceb4cb0..66f81a7 100644 (file)
@@ -19,6 +19,7 @@ ptlrpc_objs += nrs_tbf.o errno.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
 
 target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
 target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
+target_objs += $(TARGET)out_lib.o $(TARGET)out_lib.o
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 @SERVER_TRUE@ptlrpc-objs += $(target_objs)
 
 ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
 @SERVER_TRUE@ptlrpc-objs += $(target_objs)
@@ -39,6 +40,9 @@ interval_tree.c: @LUSTRE@/ldlm/interval_tree.c
 tgt_%.c: @LUSTRE@/target/tgt_%.c
        ln -sf $< $@
 
 tgt_%.c: @LUSTRE@/target/tgt_%.c
        ln -sf $< $@
 
+out_%.c: @LUSTRE@/target/out_%.c
+       ln -sf $< $@
+
 EXTRA_DIST := $(ptlrpc_objs:.o=.c) ptlrpc_internal.h
 EXTRA_DIST += $(TARGET)tgt_internal.h
 @SERVER_FALSE@EXTRA_DIST += $(target_objs:.o=.c)
 EXTRA_DIST := $(ptlrpc_objs:.o=.c) ptlrpc_internal.h
 EXTRA_DIST += $(TARGET)tgt_internal.h
 @SERVER_FALSE@EXTRA_DIST += $(target_objs:.o=.c)
index c364f40..09aa43c 100644 (file)
@@ -426,13 +426,10 @@ static int ptlrpcd(void *arg)
                }
        }
 #endif
                }
        }
 #endif
-        /*
-         * XXX So far only "client" ptlrpcd uses an environment. In
-         * the future, ptlrpcd thread (or a thread-set) has to given
-         * an argument, describing its "scope".
-         */
-        rc = lu_context_init(&env.le_ctx,
-                             LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
+       /* Both client and server (MDT/OST) may use the environment. */
+       rc = lu_context_init(&env.le_ctx, LCT_MD_THREAD | LCT_DT_THREAD |
+                                         LCT_CL_THREAD | LCT_REMEMBER |
+                                         LCT_NOREF);
        if (rc == 0) {
                rc = lu_context_init(env.le_ses,
                                     LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
        if (rc == 0) {
                rc = lu_context_init(env.le_ses,
                                     LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
index 59eee1d..24b4037 100644 (file)
@@ -31,4 +31,5 @@
 #
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
 #
 
 MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h out_handler.c
+EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
+            out_handler.c out_lib.c
index af53474..5c65021 100644 (file)
@@ -58,7 +58,7 @@ struct tx_arg *tx_add_exec(struct thandle_exec_args *ta, tx_exec_func_t func,
 }
 
 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
 }
 
 static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
-                       struct thandle_exec_args *ta)
+                       struct thandle_exec_args *ta, struct obd_export *exp)
 {
        memset(ta, 0, sizeof(*ta));
        ta->ta_handle = dt_trans_create(env, dt);
 {
        memset(ta, 0, sizeof(*ta));
        ta->ta_handle = dt_trans_create(env, dt);
@@ -68,16 +68,15 @@ static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
                return PTR_ERR(ta->ta_handle);
        }
        ta->ta_dev = dt;
                return PTR_ERR(ta->ta_handle);
        }
        ta->ta_dev = dt;
-       /*For phase I, sync for cross-ref operation*/
-       ta->ta_handle->th_sync = 1;
+       if (exp->exp_need_sync)
+               ta->ta_handle->th_sync = 1;
+
        return 0;
 }
 
 static int out_trans_start(const struct lu_env *env,
                           struct thandle_exec_args *ta)
 {
        return 0;
 }
 
 static int out_trans_start(const struct lu_env *env,
                           struct thandle_exec_args *ta)
 {
-       /* Always do sync commit for Phase I */
-       LASSERT(ta->ta_handle->th_sync != 0);
        return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
 }
 
        return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
 }
 
@@ -88,7 +87,6 @@ static int out_trans_stop(const struct lu_env *env,
        int rc;
 
        ta->ta_handle->th_result = err;
        int rc;
 
        ta->ta_handle->th_result = err;
-       LASSERT(ta->ta_handle->th_sync != 0);
        rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
        for (i = 0; i < ta->ta_argno; i++) {
                if (ta->ta_args[i].object != NULL) {
        rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
        for (i = 0; i < ta->ta_argno; i++) {
                if (ta->ta_args[i].object != NULL) {
@@ -487,7 +485,8 @@ out_unlock:
               0, rc);
 
        update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
               0, rc);
 
        update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
-                           sizeof(*obdo), 0, rc);
+                           sizeof(*obdo),
+                           tti->tti_u.update.tti_update_reply_index, rc);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -501,6 +500,7 @@ static int out_xattr_get(struct tgt_session_info *tsi)
        struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        char                    *name;
        void                    *ptr;
        struct dt_object        *obj = tti->tti_u.update.tti_dt_object;
        char                    *name;
        void                    *ptr;
+       int                      idx = tti->tti_u.update.tti_update_reply_index;
        int                      rc;
 
        ENTRY;
        int                      rc;
 
        ENTRY;
@@ -512,7 +512,7 @@ static int out_xattr_get(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        }
 
                RETURN(err_serious(-EPROTO));
        }
 
-       ptr = update_get_buf_internal(reply, 0, NULL);
+       ptr = update_get_buf_internal(reply, idx, NULL);
        LASSERT(ptr != NULL);
 
        /* The first 4 bytes(int) are used to store the result */
        LASSERT(ptr != NULL);
 
        /* The first 4 bytes(int) are used to store the result */
@@ -534,10 +534,14 @@ static int out_xattr_get(struct tgt_session_info *tsi)
        CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
               tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
               name, (int)lbuf->lb_len);
        CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
               tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
               name, (int)lbuf->lb_len);
+
+       GOTO(out, rc);
+
 out:
        *(int *)ptr = rc;
 out:
        *(int *)ptr = rc;
-       reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
-       RETURN(rc);
+       reply->ur_lens[idx] = lbuf->lb_len + sizeof(int);
+
+       return rc;
 }
 
 static int out_index_lookup(struct tgt_session_info *tsi)
 }
 
 static int out_index_lookup(struct tgt_session_info *tsi)
@@ -587,7 +591,8 @@ out_unlock:
               0, rc);
 
        update_insert_reply(tti->tti_u.update.tti_update_reply,
               0, rc);
 
        update_insert_reply(tti->tti_u.update.tti_update_reply,
-                           &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
+                           &tti->tti_fid1, sizeof(tti->tti_fid1),
+                           tti->tti_u.update.tti_update_reply_index, rc);
        RETURN(rc);
 }
 
        RETURN(rc);
 }
 
@@ -1243,14 +1248,14 @@ int out_handle(struct tgt_session_info *tsi)
                RETURN(err_serious(-EPROTO));
        }
 
                RETURN(err_serious(-EPROTO));
        }
 
-       if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
+       if (ubuf->ub_magic != UPDATE_BUFFER_MAGIC) {
                CERROR("%s: invalid magic %x expect %x: rc = %d\n",
                CERROR("%s: invalid magic %x expect %x: rc = %d\n",
-                      tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
+                      tgt_name(tsi->tsi_tgt), ubuf->ub_magic,
                       UPDATE_BUFFER_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
                       UPDATE_BUFFER_MAGIC, -EPROTO);
                RETURN(err_serious(-EPROTO));
        }
 
-       count = le32_to_cpu(ubuf->ub_count);
+       count = ubuf->ub_count;
        if (count <= 0) {
                CERROR("%s: No update!: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), -EPROTO);
        if (count <= 0) {
                CERROR("%s: No update!: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), -EPROTO);
@@ -1273,7 +1278,7 @@ int out_handle(struct tgt_session_info *tsi)
        update_init_reply_buf(update_reply, count);
        tti->tti_u.update.tti_update_reply = update_reply;
 
        update_init_reply_buf(update_reply, count);
        tti->tti_u.update.tti_update_reply = update_reply;
 
-       rc = out_tx_start(env, dt, ta);
+       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
        if (rc != 0)
                RETURN(rc);
 
        if (rc != 0)
                RETURN(rc);
 
@@ -1295,7 +1300,7 @@ int out_handle(struct tgt_session_info *tsi)
                        if (rc != 0)
                                RETURN(rc);
 
                        if (rc != 0)
                                RETURN(rc);
 
-                       rc = out_tx_start(env, dt, ta);
+                       rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
                        if (rc != 0)
                                RETURN(rc);
                        old_batchid = update->u_batchid;
                        if (rc != 0)
                                RETURN(rc);
                        old_batchid = update->u_batchid;
diff --git a/lustre/target/out_lib.c b/lustre/target/out_lib.c
new file mode 100644 (file)
index 0000000..3fe8080
--- /dev/null
@@ -0,0 +1,234 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/target/out_lib.c
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+
+struct update_request *out_find_update(struct thandle *th,
+                                      struct dt_device *dt_dev)
+{
+       struct update_request   *update;
+
+       list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
+               if (update->ur_dt == dt_dev)
+                       return update;
+       }
+
+       return NULL;
+}
+EXPORT_SYMBOL(out_find_update);
+
+void out_destroy_update_req(struct update_request *update)
+{
+       if (update == NULL)
+               return;
+
+       LASSERT(list_empty(&update->ur_cb_items));
+
+       list_del(&update->ur_list);
+       if (update->ur_buf != NULL)
+               OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
+
+       OBD_FREE_PTR(update);
+}
+EXPORT_SYMBOL(out_destroy_update_req);
+
+struct update_request *out_create_update_req(struct dt_device *dt)
+{
+       struct update_request *update;
+
+       OBD_ALLOC_PTR(update);
+       if (update == NULL)
+               return ERR_PTR(-ENOMEM);
+
+       OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
+       if (update->ur_buf == NULL) {
+               OBD_FREE_PTR(update);
+
+               return ERR_PTR(-ENOMEM);
+       }
+
+       INIT_LIST_HEAD(&update->ur_list);
+       update->ur_dt = dt;
+       update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
+       update->ur_buf->ub_count = 0;
+       INIT_LIST_HEAD(&update->ur_cb_items);
+
+       return update;
+}
+EXPORT_SYMBOL(out_create_update_req);
+
+/**
+ * Find one loc in th_dev/dev_obj_update for the update,
+ * Because only one thread can access this thandle, no need
+ * lock now.
+ */
+struct update_request *out_find_create_update_loc(struct thandle *th,
+                                                 struct dt_object *dt)
+{
+       struct dt_device        *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+       struct update_request   *update;
+       ENTRY;
+
+       update = out_find_update(th, dt_dev);
+       if (update != NULL)
+               RETURN(update);
+
+       update = out_create_update_req(dt_dev);
+       if (IS_ERR(update))
+               RETURN(update);
+
+       list_add_tail(&update->ur_list, &th->th_remote_update_list);
+
+       RETURN(update);
+}
+EXPORT_SYMBOL(out_find_create_update_loc);
+
+int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+                       const struct update_buf *ubuf, int ubuf_len,
+                       struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request  *req;
+       struct update_buf      *tmp;
+       int                     rc;
+       ENTRY;
+
+       req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
+       if (req == NULL)
+               RETURN(-ENOMEM);
+
+       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
+                            UPDATE_BUFFER_SIZE);
+
+       rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
+       if (rc != 0) {
+               ptlrpc_req_finished(req);
+               RETURN(rc);
+       }
+
+       req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
+                            UPDATE_BUFFER_SIZE);
+
+       tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
+       memcpy(tmp, ubuf, ubuf_len);
+       ptlrpc_request_set_replen(req);
+       req->rq_request_portal = OUT_PORTAL;
+       req->rq_reply_portal = OSC_REPLY_PORTAL;
+       *reqp = req;
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(out_prep_update_req);
+
+int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
+                   struct update_request *update,
+                   struct ptlrpc_request **reqp)
+{
+       struct ptlrpc_request   *req = NULL;
+       int                      rc;
+       ENTRY;
+
+       rc = out_prep_update_req(env, imp, update->ur_buf,
+                                UPDATE_BUFFER_SIZE, &req);
+       if (rc != 0)
+               RETURN(rc);
+
+       /* Note: some dt index api might return non-zero result here, like
+        * osd_index_ea_lookup, so we should only check rc < 0 here */
+       rc = ptlrpc_queue_wait(req);
+       if (rc < 0) {
+               ptlrpc_req_finished(req);
+               update->ur_rc = rc;
+               RETURN(rc);
+       }
+
+       if (reqp != NULL) {
+               *reqp = req;
+       } else {
+               update->ur_rc = rc;
+               ptlrpc_req_finished(req);
+       }
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(out_remote_sync);
+
+int out_insert_update(const struct lu_env *env, struct update_request *update,
+                     int op, const struct lu_fid *fid, int count,
+                     int *lens, const char **bufs)
+{
+       struct update_buf    *ubuf = update->ur_buf;
+       struct update        *obj_update;
+       char                 *ptr;
+       int                   i;
+       int                   update_length;
+       ENTRY;
+
+       obj_update = (struct update *)((char *)ubuf +
+                     cfs_size_round(update_buf_size(ubuf)));
+
+       /* Check update size to make sure it can fit into the buffer */
+       update_length = cfs_size_round(offsetof(struct update,
+                                      u_bufs[0]));
+       for (i = 0; i < count; i++)
+               update_length += cfs_size_round(lens[i]);
+
+       if (cfs_size_round(update_buf_size(ubuf)) + update_length >
+           UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS)
+               RETURN(-E2BIG);
+
+       if (count > UPDATE_BUF_COUNT)
+               RETURN(-E2BIG);
+
+       /* fill the update into the update buffer */
+       fid_cpu_to_le(&obj_update->u_fid, fid);
+       obj_update->u_type = cpu_to_le32(op);
+       obj_update->u_batchid = update->ur_batchid;
+       for (i = 0; i < count; i++)
+               obj_update->u_lens[i] = cpu_to_le32(lens[i]);
+
+       ptr = (char *)obj_update +
+                       cfs_size_round(offsetof(struct update, u_bufs[0]));
+       for (i = 0; i < count; i++)
+               LOGL(bufs[i], lens[i], ptr);
+
+       ubuf->ub_count++;
+
+       CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
+              update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
+              ubuf->ub_count, op, count, update_buf_size(ubuf));
+
+       RETURN(0);
+}
+EXPORT_SYMBOL(out_insert_update);
index cff6b41..2164cda 100644 (file)
@@ -95,8 +95,8 @@ struct tx_arg {
 struct thandle_exec_args {
        struct thandle          *ta_handle;
        struct dt_device        *ta_dev;
 struct thandle_exec_args {
        struct thandle          *ta_handle;
        struct dt_device        *ta_dev;
-       int                      ta_err;
        struct tx_arg            ta_args[TX_MAX_OPS];
        struct tx_arg            ta_args[TX_MAX_OPS];
+       int                      ta_err;
        int                      ta_argno;   /* used args */
 };
 
        int                      ta_argno;   /* used args */
 };
 
index 17e4324..3264c1f 100644 (file)
@@ -488,7 +488,12 @@ void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
               ccb->lncc_exp->exp_client_uuid.uuid);
 
        spin_lock(&ccb->lncc_exp->exp_lock);
               ccb->lncc_exp->exp_client_uuid.uuid);
 
        spin_lock(&ccb->lncc_exp->exp_lock);
-       ccb->lncc_exp->exp_need_sync = 0;
+       /* XXX: Currently, we use per-export based sync/async policy for
+        *      the update via OUT RPC, it is coarse-grained policy, and
+        *      will be changed as per-request based by DNE II patches. */
+       if (!ccb->lncc_exp->exp_keep_sync)
+               ccb->lncc_exp->exp_need_sync = 0;
+
        spin_unlock(&ccb->lncc_exp->exp_lock);
        class_export_cb_put(ccb->lncc_exp);
 
        spin_unlock(&ccb->lncc_exp->exp_lock);
        class_export_cb_put(ccb->lncc_exp);