* lu_object_operations, but that would break existing symmetry.
*/
+ int (*do_declare_attr_get)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_capa *capa);
/**
* Return standard attributes.
*
const struct lu_attr *attr,
struct thandle *handle,
struct lustre_capa *capa);
+
+ int (*do_declare_xattr_get)(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_buf *buf,
+ const char *name,
+ struct lustre_capa *capa);
+
/**
* Return a value of an extended attribute.
*
return dt->do_ops->do_write_locked(env, dt);
}
+static inline int dt_declare_attr_get(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_attr_get);
+ return dt->do_ops->do_declare_attr_get(env, dt, capa);
+}
+
static inline int dt_attr_get(const struct lu_env *env, struct dt_object *dt,
struct lu_attr *la, void *arg)
{
return dt->do_ops->do_xattr_set(env, dt, buf, name, fl, th, capa);
}
+static inline int dt_declare_xattr_get(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_buf *buf,
+ const char *name,
+ struct lustre_capa *capa)
+{
+ LASSERT(dt);
+ LASSERT(dt->do_ops);
+ LASSERT(dt->do_ops->do_declare_xattr_get);
+ return dt->do_ops->do_declare_xattr_get(env, dt, buf, name, capa);
+}
+
static inline int dt_xattr_get(const struct lu_env *env,
struct dt_object *dt, struct lu_buf *buf,
const char *name, struct lustre_capa *capa)
int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
loff_t off);
+/* target/out_lib.c */
+struct update_request *
+out_find_update(struct thandle *th, struct dt_device *dt_dev);
+void out_destroy_update_req(struct update_request *update);
+struct update_request *out_create_update_req(struct dt_device *dt);
+struct update_request *out_find_create_update_loc(struct thandle *th,
+ struct dt_object *dt);
+int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ const struct update_buf *ubuf, int ubuf_len,
+ struct ptlrpc_request **reqp);
+int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
+ struct update_request *update,
+ struct ptlrpc_request **reqp);
+int out_insert_update(const struct lu_env *env, struct update_request *update,
+ int op, const struct lu_fid *fid, int count,
+ int *lens, const char **bufs);
+
enum {
ESERIOUS = 0x0001000
};
exp_req_replay_needed:1,
exp_lock_replay_needed:1,
exp_need_sync:1,
+ exp_keep_sync:1,
exp_flvr_changed:1,
exp_flvr_adapt:1,
exp_libclient:1, /* liblustre client? */
#define UPDATE_BUFFER_SIZE 8192
struct update_request {
struct dt_device *ur_dt;
- cfs_list_t ur_list; /* attached itself to thandle */
- int ur_flags;
- int ur_rc; /* request result */
- int ur_batchid; /* Current batch(trans) id */
- struct update_buf *ur_buf; /* Holding the update req */
+ struct list_head ur_list; /* attached itself to thandle */
+ int ur_flags;
+ int ur_rc; /* request result */
+ int ur_batchid; /* Current batch(trans) id */
+ struct update_buf *ur_buf; /* Holding the update req */
+ struct list_head ur_cb_items;
};
static inline unsigned long update_size(struct update *update)
reply->ur_lens[index] = data_len + sizeof(int);
}
-static inline int update_get_reply_buf(struct update_reply *reply, void **buf,
- int index)
+static inline int update_get_reply_buf(struct update_reply *reply,
+ struct lu_buf *lbuf, int index)
{
char *ptr;
int size = 0;
int result;
+ LASSERT(lbuf != NULL);
+
ptr = update_get_buf_internal(reply, index, &size);
LASSERT(ptr != NULL);
result = *(int *)ptr;
return result;
LASSERT(size >= sizeof(int));
- *buf = ptr + sizeof(int);
- return size - sizeof(int);
+
+ lbuf->lb_buf = ptr + sizeof(int);
+ lbuf->lb_len = size - sizeof(int);
+
+ return 0;
}
static inline int update_get_reply_result(struct update_reply *reply,
return *(int *)ptr;
}
-#endif
-
+static inline void update_inc_batchid(struct update_request *update)
+{
+ update->ur_batchid++;
+}
+#endif
struct list_head llmd_mdt_phase2_list;
struct ptlrpc_thread llmd_thread;
- atomic_t llmd_rpcs_in_flight;
__u32 llmd_touch_gen;
int llmd_prefetched;
int llmd_assistant_status;
while (!list_empty(&llmd->llmd_req_list)) {
bool wakeup = false;
- l_wait_event(athread->t_ctl_waitq,
- bk->lb_async_windows == 0 ||
- atomic_read(&llmd->llmd_rpcs_in_flight) <
- bk->lb_async_windows ||
- llmd->llmd_exit,
- &lwi);
-
if (unlikely(llmd->llmd_exit))
GOTO(cleanup1, rc = llmd->llmd_post_result);
}
/* Wakeup the master engine if it is waiting in checkpoint. */
- if (atomic_read(&llmd->llmd_rpcs_in_flight) == 0)
- wake_up_all(&mthread->t_ctl_waitq);
+ wake_up_all(&mthread->t_ctl_waitq);
l_wait_event(athread->t_ctl_waitq,
!lfsck_layout_req_empty(llmd) ||
LASSERTF(llmd->llmd_prefetched == 0, "unmatched prefeteched objs %d\n",
llmd->llmd_prefetched);
- l_wait_event(athread->t_ctl_waitq,
- atomic_read(&llmd->llmd_rpcs_in_flight) == 0,
- &lwi);
-
cleanup2:
memset(lr, 0, sizeof(*lr));
lr->lr_index = lfsck_dev_idx(lfsck->li_bottom);
return 0;
l_wait_event(mthread->t_ctl_waitq,
- (list_empty(&llmd->llmd_req_list) &&
- atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
+ list_empty(&llmd->llmd_req_list) ||
!thread_is_running(mthread) ||
thread_is_stopped(athread),
&lwi);
wake_up_all(&athread->t_ctl_waitq);
l_wait_event(mthread->t_ctl_waitq,
- (result > 0 && list_empty(&llmd->llmd_req_list) &&
- atomic_read(&llmd->llmd_rpcs_in_flight) == 0) ||
+ (result > 0 && list_empty(&llmd->llmd_req_list)) ||
thread_is_stopped(athread),
&lwi);
LASSERT(thread_is_init(&llmd->llmd_thread) ||
thread_is_stopped(&llmd->llmd_thread));
LASSERT(list_empty(&llmd->llmd_req_list));
- LASSERT(atomic_read(&llmd->llmd_rpcs_in_flight) == 0);
com->lc_data = NULL;
INIT_LIST_HEAD(&llmd->llmd_mdt_phase1_list);
INIT_LIST_HEAD(&llmd->llmd_mdt_phase2_list);
init_waitqueue_head(&llmd->llmd_thread.t_ctl_waitq);
- atomic_set(&llmd->llmd_rpcs_in_flight, 0);
com->lc_data = llmd;
} else {
struct lfsck_layout_slave_data *llsd;
rc = tgt_client_new(env, lexp);
if (rc == 0)
mdt_export_stats_init(obd, lexp, localdata);
+
+ /* For phase I, sync for cross-ref operation. */
+ lexp->exp_keep_sync = 1;
}
if (rc != 0) {
MODULES = osp
osp-objs = osp_dev.o osp_object.o osp_precreate.o osp_sync.o lproc_osp.o
-osp-objs += lwp_dev.o osp_md_object.o
+osp-objs += lwp_dev.o osp_md_object.o osp_trans.o
EXTRA_DIST = $(osp-objs:.o=.c) osp_internal.h
}
const struct dt_device_operations osp_dt_ops = {
- .dt_statfs = osp_statfs,
- .dt_sync = osp_sync,
- .dt_trans_start = osp_trans_start,
- .dt_trans_stop = osp_trans_stop,
+ .dt_statfs = osp_statfs,
+ .dt_sync = osp_sync,
+ .dt_trans_create = osp_trans_create,
+ .dt_trans_start = osp_trans_start,
+ .dt_trans_stop = osp_trans_stop,
};
static int osp_connect_to_osd(const struct lu_env *env, struct osp_device *m,
ENTRY;
+ mutex_init(&m->opd_async_requests_mutex);
obd = class_name2obd(lustre_cfg_string(cfg, 0));
if (obd == NULL) {
CERROR("Cannot find obd with name %s\n",
ENTRY;
+ if (m->opd_async_requests != NULL) {
+ out_destroy_update_req(m->opd_async_requests);
+ m->opd_async_requests = NULL;
+ }
+
if (m->opd_storage_exp)
obd_disconnect(m->opd_storage_exp);
.ldt_tags = LU_DEVICE_DT,
.ldt_name = LUSTRE_OSP_NAME,
.ldt_ops = &osp_device_type_ops,
- .ldt_ctx_tags = LCT_MD_THREAD
+ .ldt_ctx_tags = LCT_MD_THREAD | LCT_DT_THREAD,
};
static struct obd_ops osp_obd_device_ops = {
#include <dt_object.h>
#include <md_object.h>
#include <lustre_fid.h>
+#include <lustre_update.h>
+#include <lu_target.h>
/*
* Infrastructure to support tracking of last committed llog record
int opd_statfs_maxage;
cfs_proc_dir_entry_t *opd_symlink;
+
+ /* If the caller wants to do some idempotent async operations on
+ * remote server, it can append the async remote requests on the
+ * osp_device::opd_async_requests via declare() functions, these
+ * requests can be packed together and sent to the remote server
+ * via single OUT RPC later. */
+ struct update_request *opd_async_requests;
+ /* Protect current operations on opd_async_requests. */
+ struct mutex opd_async_requests_mutex;
};
#define opd_pre_lock opd_pre->osp_pre_lock
extern struct kmem_cache *osp_object_kmem;
+/* The first part of oxe_buf is xattr name, and is '\0' terminated.
+ * The left part is for value, binary mode. */
+struct osp_xattr_entry {
+ struct list_head oxe_list;
+ atomic_t oxe_ref;
+ void *oxe_value;
+ int oxe_buflen;
+ int oxe_namelen;
+ int oxe_vallen;
+ unsigned int oxe_exist:1,
+ oxe_ready:1;
+ char oxe_buf[0];
+};
+
+struct osp_object_attr {
+ struct lu_attr ooa_attr;
+ struct list_head ooa_xattr_list;
+};
+
/* this is a top object */
struct osp_object {
struct lu_object_header opo_header;
struct dt_object opo_obj;
unsigned int opo_reserved:1,
opo_new:1,
- opo_empty:1;
+ opo_empty:1,
+ opo_non_exist:1;
/* read/write lock for md osp object */
struct rw_semaphore opo_sem;
const struct lu_env *opo_owner;
+ struct osp_object_attr *opo_ooa;
+ /* Protect opo_ooa. */
+ spinlock_t opo_lock;
};
extern struct lu_object_operations osp_lu_obj_ops;
struct obdo osi_obdo;
};
+static inline bool is_remote_trans(struct thandle *th)
+{
+ return th->th_dev->dd_ops == &osp_dt_ops;
+}
+
static inline void osp_objid_buf_prep(struct lu_buf *buf, loff_t *off,
__u32 *id, int index)
{
return imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_FID;
}
+typedef int (*osp_async_update_interpterer_t)(const struct lu_env *env,
+ struct update_reply *reply,
+ struct osp_object *obj,
+ void *data, int index, int rc);
+
/* osp_dev.c */
void osp_update_last_id(struct osp_device *d, obd_id objid);
extern struct llog_operations osp_mds_ost_orig_logops;
-/* osp_md_object.c */
+/* osp_trans.c */
+struct update_request *
+osp_find_or_create_async_update_request(struct osp_device *osp);
+int osp_insert_async_update(const struct lu_env *env,
+ struct update_request *update, int op,
+ struct osp_object *obj, int count,
+ int *lens, const char **bufs, void *data,
+ osp_async_update_interpterer_t interpterer);
+int osp_unplug_async_update(const struct lu_env *env,
+ struct osp_device *osp,
+ struct update_request *update);
+struct thandle *osp_trans_create(const struct lu_env *env,
+ struct dt_device *d);
int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
struct thandle *th);
int osp_trans_stop(const struct lu_env *env, struct thandle *th);
+
+/* osp_object.c */
+int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr, struct lustre_capa *capa);
+int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa);
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int flag, struct thandle *th);
+int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name, int fl,
+ struct thandle *th, struct lustre_capa *capa);
+int osp_declare_object_destroy(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th);
+int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th);
+
/* osp_precreate.c */
int osp_init_precreate(struct osp_device *d);
int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d);
#define DEBUG_SUBSYSTEM S_MDS
#include <lustre_log.h>
-#include <lustre_update.h>
#include "osp_internal.h"
+
static const char dot[] = ".";
static const char dotdot[] = "..";
-static int osp_prep_update_req(const struct lu_env *env,
- struct osp_device *osp,
- struct update_buf *ubuf, int ubuf_len,
- struct ptlrpc_request **reqp)
-{
- struct obd_import *imp;
- struct ptlrpc_request *req;
- struct update_buf *tmp;
- int rc;
- ENTRY;
-
- imp = osp->opd_obd->u.cli.cl_import;
- LASSERT(imp);
-
- req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
- if (req == NULL)
- RETURN(-ENOMEM);
-
- req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
- UPDATE_BUFFER_SIZE);
-
- rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
- if (rc != 0) {
- ptlrpc_req_finished(req);
- RETURN(rc);
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
- UPDATE_BUFFER_SIZE);
-
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
- memcpy(tmp, ubuf, ubuf_len);
-
- ptlrpc_request_set_replen(req);
-
- *reqp = req;
-
- RETURN(rc);
-}
-
-static int osp_remote_sync(const struct lu_env *env, struct dt_device *dt,
- struct update_request *update,
- struct ptlrpc_request **reqp)
-{
- struct osp_device *osp = dt2osp_dev(dt);
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- rc = osp_prep_update_req(env, osp, update->ur_buf, UPDATE_BUFFER_SIZE,
- &req);
- if (rc)
- RETURN(rc);
-
- /* Note: some dt index api might return non-zero result here, like
- * osd_index_ea_lookup, so we should only check rc < 0 here */
- rc = ptlrpc_queue_wait(req);
- if (rc < 0) {
- ptlrpc_req_finished(req);
- update->ur_rc = rc;
- RETURN(rc);
- }
-
- if (reqp != NULL) {
- *reqp = req;
- RETURN(rc);
- }
-
- update->ur_rc = rc;
-
- ptlrpc_req_finished(req);
-
- RETURN(rc);
-}
-
-/**
- * Create a new update request for the device.
- */
-static struct update_request
-*osp_create_update_req(struct dt_device *dt)
-{
- struct update_request *update;
-
- OBD_ALLOC_PTR(update);
- if (!update)
- return ERR_PTR(-ENOMEM);
-
- OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
- if (update->ur_buf == NULL) {
- OBD_FREE_PTR(update);
- return ERR_PTR(-ENOMEM);
- }
-
- CFS_INIT_LIST_HEAD(&update->ur_list);
- update->ur_dt = dt;
- update->ur_batchid = 0;
- update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
- update->ur_buf->ub_count = 0;
-
- return update;
-}
-
-static void osp_destroy_update_req(struct update_request *update)
-{
- if (update == NULL)
- return;
-
- cfs_list_del(&update->ur_list);
- if (update->ur_buf != NULL)
- OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
-
- OBD_FREE_PTR(update);
- return;
-}
-
-int osp_trans_stop(const struct lu_env *env, struct thandle *th)
-{
- int rc = 0;
-
- rc = th->th_current_request->ur_rc;
- osp_destroy_update_req(th->th_current_request);
- th->th_current_request = NULL;
-
- return rc;
-}
-
-/**
- * In DNE phase I, all remote updates will be packed into RPC (the format
- * description is in lustre_idl.h) during declare phase, all of updates
- * are attached to the transaction, one entry per OSP. Then in trans start,
- * LOD will walk through these entries and send these UPDATEs to the remote
- * MDT to be executed synchronously.
- */
-int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
- struct thandle *th)
-{
- struct update_request *update;
- int rc = 0;
-
- /* In phase I, if the transaction includes remote updates, the local
- * update should be synchronized, so it will set th_sync = 1 */
- update = th->th_current_request;
- LASSERT(update != NULL && update->ur_dt == dt);
- if (update->ur_buf->ub_count > 0) {
- rc = osp_remote_sync(env, dt, update, NULL);
- th->th_sync = 1;
- }
-
- RETURN(rc);
-}
-
-/**
- * Insert the update into the th_bufs for the device.
- */
-static int osp_insert_update(const struct lu_env *env,
- struct update_request *update, int op,
- struct lu_fid *fid, int count,
- int *lens, char **bufs)
-{
- struct update_buf *ubuf = update->ur_buf;
- struct update *obj_update;
- char *ptr;
- int i;
- int update_length;
- int rc = 0;
- ENTRY;
-
- obj_update = (struct update *)((char *)ubuf +
- cfs_size_round(update_buf_size(ubuf)));
-
- /* Check update size to make sure it can fit into the buffer */
- update_length = cfs_size_round(offsetof(struct update,
- u_bufs[0]));
- for (i = 0; i < count; i++)
- update_length += cfs_size_round(lens[i]);
-
- if (cfs_size_round(update_buf_size(ubuf)) + update_length >
- UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS) {
- CERROR("%s: insert up %p, idx %d cnt %d len %lu: rc = %d\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf,
- update_length, ubuf->ub_count, update_buf_size(ubuf),
- -E2BIG);
- RETURN(-E2BIG);
- }
-
- if (count > UPDATE_BUF_COUNT) {
- CERROR("%s: Insert too much params %d "DFID" op %d: rc = %d\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, count,
- PFID(fid), op, -E2BIG);
- RETURN(-E2BIG);
- }
-
- /* fill the update into the update buffer */
- fid_cpu_to_le(&obj_update->u_fid, fid);
- obj_update->u_type = cpu_to_le32(op);
- obj_update->u_batchid = update->ur_batchid;
- for (i = 0; i < count; i++)
- obj_update->u_lens[i] = cpu_to_le32(lens[i]);
-
- ptr = (char *)obj_update +
- cfs_size_round(offsetof(struct update, u_bufs[0]));
- for (i = 0; i < count; i++)
- LOGL(bufs[i], lens[i], ptr);
-
- ubuf->ub_count++;
-
- CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
- update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
- ubuf->ub_count, op, count, update_buf_size(ubuf));
-
- RETURN(rc);
-}
-
-static struct update_request
-*osp_find_update(struct thandle *th, struct dt_device *dt_dev)
-{
- struct update_request *update;
-
- /* Because transaction api does not proivde the interface
- * to transfer the update from LOD to OSP, we need walk
- * remote update list to find the update, this probably
- * should move to LOD layer, when update can be part of
- * the trancation api parameter. XXX */
- cfs_list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
- if (update->ur_dt == dt_dev)
- return update;
- }
- return NULL;
-}
-
-static inline void osp_md_add_update_batchid(struct update_request *update)
-{
- update->ur_batchid++;
-}
-
-/**
- * Find one loc in th_dev/dev_obj_update for the update,
- * Because only one thread can access this thandle, no need
- * lock now.
- */
-static struct update_request
-*osp_find_create_update_loc(struct thandle *th, struct dt_object *dt)
-{
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
- struct update_request *update;
- ENTRY;
-
- update = osp_find_update(th, dt_dev);
- if (update != NULL)
- RETURN(update);
-
- update = osp_create_update_req(dt_dev);
- if (IS_ERR(update))
- RETURN(update);
-
- cfs_list_add_tail(&update->ur_list, &th->th_remote_update_list);
-
- RETURN(update);
-}
-
-static int osp_get_attr_from_req(const struct lu_env *env,
- struct ptlrpc_request *req,
- struct lu_attr *attr, int index)
-{
- struct update_reply *reply;
- struct obdo *lobdo = &osp_env_info(env)->osi_obdo;
- struct obdo *wobdo;
- int size;
-
- LASSERT(attr != NULL);
-
- reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
- UPDATE_BUFFER_SIZE);
- if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
- return -EPROTO;
-
- size = update_get_reply_buf(reply, (void **)&wobdo, index);
- if (size != sizeof(struct obdo))
- return -EPROTO;
-
- obdo_le_to_cpu(wobdo, wobdo);
- lustre_get_wire_obdo(NULL, lobdo, wobdo);
- la_from_obdo(attr, lobdo, lobdo->o_valid);
-
- return 0;
-}
-
static int osp_md_declare_object_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
int buf_count;
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
CDEBUG(D_HA, "%s: object "DFID" exists, destroy this orphan\n",
dt->do_lu.lo_dev->ld_obd->obd_name, PFID(fid1));
- rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
+ rc = out_insert_update(env, update, OBJ_REF_DEL, fid1, 0,
NULL, NULL);
if (rc != 0)
GOTO(out, rc);
if (S_ISDIR(lu_object_attr(&dt->do_lu))) {
/* decrease for ".." */
- rc = osp_insert_update(env, update, OBJ_REF_DEL, fid1,
+ rc = out_insert_update(env, update, OBJ_REF_DEL, fid1,
0, NULL, NULL);
if (rc != 0)
GOTO(out, rc);
}
- rc = osp_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
+ rc = out_insert_update(env, update, OBJ_DESTROY, fid1, 0, NULL,
NULL);
if (rc != 0)
GOTO(out, rc);
dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS;
/* Increase batchid to add this orphan object deletion
* to separate transaction */
- osp_md_add_update_batchid(update);
+ update_inc_batchid(update);
}
- rc = osp_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
- bufs);
+ rc = out_insert_update(env, update, OBJ_CREATE, fid1, buf_count, sizes,
+ (const char **)bufs);
out:
if (rc)
CERROR("%s: Insert update error: rc = %d\n",
struct lu_fid *fid;
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- rc = osp_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
+ rc = out_insert_update(env, update, OBJ_REF_DEL, fid, 0, NULL, NULL);
return rc;
}
struct lu_fid *fid;
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- rc = osp_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
+ rc = out_insert_update(env, update, OBJ_REF_ADD, fid, 0, NULL, NULL);
return rc;
}
char *buf;
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
buf = (char *)&osi->osi_obdo;
fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- rc = osp_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size, &buf);
+ rc = out_insert_update(env, update, OBJ_ATTR_SET, fid, 1, &size,
+ (const char **)&buf);
return rc;
}
RETURN(0);
}
-static int osp_md_declare_xattr_set(const struct lu_env *env,
- struct dt_object *dt,
- const struct lu_buf *buf,
- const char *name, int flag,
- struct thandle *th)
-{
- struct update_request *update;
- struct lu_fid *fid;
- int sizes[3] = {strlen(name), buf->lb_len,
- sizeof(int)};
- char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
- int rc;
-
- LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
- update = osp_find_create_update_loc(th, dt);
- if (IS_ERR(update)) {
- CERROR("%s: Get OSP update buf failed: rc = %d\n",
- dt->do_lu.lo_dev->ld_obd->obd_name,
- (int)PTR_ERR(update));
- return PTR_ERR(update);
- }
-
- flag = cpu_to_le32(flag);
- bufs[2] = (char *)&flag;
-
- fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- rc = osp_insert_update(env, update, OBJ_XATTR_SET, fid,
- ARRAY_SIZE(sizes), sizes, bufs);
-
- return rc;
-}
-
-static int osp_md_xattr_set(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, const char *name, int fl,
- struct thandle *th, struct lustre_capa *capa)
-{
- CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
- PFID(&dt->do_lu.lo_header->loh_fid));
-
- return 0;
-}
-
-static int osp_md_xattr_get(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, const char *name,
- struct lustre_capa *capa)
-{
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
- struct update_request *update = NULL;
- struct ptlrpc_request *req = NULL;
- int rc;
- int buf_len;
- int size;
- struct update_reply *reply;
- void *ea_buf;
- ENTRY;
-
- /* Because it needs send the update buffer right away,
- * just create an update buffer, instead of attaching the
- * update_remote list of the thandle.
- */
- update = osp_create_update_req(dt_dev);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
-
- LASSERT(name != NULL);
- buf_len = strlen(name);
- rc = osp_insert_update(env, update, OBJ_XATTR_GET,
- (struct lu_fid *)lu_object_fid(&dt->do_lu),
- 1, &buf_len, (char **)&name);
- if (rc != 0) {
- CERROR("%s: Insert update error: rc = %d\n",
- dt->do_lu.lo_dev->ld_obd->obd_name, rc);
- GOTO(out, rc);
- }
- dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-
- rc = osp_remote_sync(env, dt_dev, update, &req);
- if (rc != 0)
- GOTO(out, rc);
-
- reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
- UPDATE_BUFFER_SIZE);
- if (reply->ur_version != UPDATE_REPLY_V1) {
- CERROR("%s: Wrong version %x expected %x: rc = %d\n",
- dt_dev->dd_lu_dev.ld_obd->obd_name,
- reply->ur_version, UPDATE_REPLY_V1, -EPROTO);
- GOTO(out, rc = -EPROTO);
- }
-
- size = update_get_reply_buf(reply, &ea_buf, 0);
- if (size < 0)
- GOTO(out, rc = size);
-
- LASSERT(size > 0 && size < PAGE_CACHE_SIZE);
- LASSERT(ea_buf != NULL);
-
- rc = size;
- if (buf->lb_buf != NULL)
- memcpy(buf->lb_buf, ea_buf, size);
-out:
- if (req != NULL)
- ptlrpc_req_finished(req);
-
- osp_destroy_update_req(update);
-
- RETURN(rc);
-}
-
static void osp_md_object_read_lock(const struct lu_env *env,
struct dt_object *dt, unsigned role)
{
struct dt_rec *rec, const struct dt_key *key,
struct lustre_capa *capa)
{
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+ struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2;
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct dt_device *dt_dev = &osp->opd_dt_dev;
struct update_request *update;
struct ptlrpc_request *req = NULL;
int size = strlen((char *)key) + 1;
- char *name = (char *)key;
int rc;
struct update_reply *reply;
struct lu_fid *fid;
* just create an update buffer, instead of attaching the
* update_remote list of the thandle.
*/
- update = osp_create_update_req(dt_dev);
+ update = out_create_update_req(dt_dev);
if (IS_ERR(update))
RETURN(PTR_ERR(update));
- rc = osp_insert_update(env, update, OBJ_INDEX_LOOKUP,
- (struct lu_fid *)lu_object_fid(&dt->do_lu),
- 1, &size, (char **)&name);
+ rc = out_insert_update(env, update, OBJ_INDEX_LOOKUP,
+ lu_object_fid(&dt->do_lu), 1, &size,
+ (const char **)&key);
if (rc) {
CERROR("%s: Insert update error: rc = %d\n",
dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
GOTO(out, rc);
}
- rc = osp_remote_sync(env, dt_dev, update, &req);
+ rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
if (rc < 0)
GOTO(out, rc);
GOTO(out, rc);
}
- size = update_get_reply_buf(reply, (void **)&fid, 0);
- if (size < 0)
- GOTO(out, rc = size);
+ rc = update_get_reply_buf(reply, lbuf, 0);
+ if (rc < 0)
+ GOTO(out, rc);
- if (size != sizeof(struct lu_fid)) {
- CERROR("%s: lookup "DFID" %s wrong size %d: rc = %d\n",
+ if (lbuf->lb_len != sizeof(*fid)) {
+ CERROR("%s: lookup "DFID" %s wrong size %d\n",
dt_dev->dd_lu_dev.ld_obd->obd_name,
- PFID(lu_object_fid(&dt->do_lu)), (char *)key, size, rc);
+ PFID(lu_object_fid(&dt->do_lu)), (char *)key,
+ (int)lbuf->lb_len);
GOTO(out, rc = -EINVAL);
}
+ fid = lbuf->lb_buf;
fid_le_to_cpu(fid, fid);
if (!fid_is_sane(fid)) {
- CERROR("%s: lookup "DFID" %s invalid fid "DFID": rc = %d\n",
+ CERROR("%s: lookup "DFID" %s invalid fid "DFID"\n",
dt_dev->dd_lu_dev.ld_obd->obd_name,
- PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid),
- rc);
+ PFID(lu_object_fid(&dt->do_lu)), (char *)key, PFID(fid));
GOTO(out, rc = -EINVAL);
}
+
memcpy(rec, fid, sizeof(*fid));
+
+ GOTO(out, rc = 1);
+
out:
if (req != NULL)
ptlrpc_req_finished(req);
- osp_destroy_update_req(update);
+ out_destroy_update_req(update);
- RETURN(rc);
+ return rc;
}
static int osp_md_declare_insert(const struct lu_env *env,
struct lu_fid *rec_fid = (struct lu_fid *)rec;
int size[2] = {strlen((char *)key) + 1,
sizeof(*rec_fid)};
- char *bufs[2] = {(char *)key, (char *)rec_fid};
+ const char *bufs[2] = {(char *)key, (char *)rec_fid};
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
fid_cpu_to_le(rec_fid, rec_fid);
- rc = osp_insert_update(env, update, OBJ_INDEX_INSERT, fid,
+ rc = out_insert_update(env, update, OBJ_INDEX_INSERT, fid,
ARRAY_SIZE(size), size, bufs);
return rc;
}
struct update_request *update;
struct lu_fid *fid;
int size = strlen((char *)key) + 1;
- char *buf = (char *)key;
int rc;
- update = osp_find_create_update_loc(th, dt);
+ update = out_find_create_update_loc(th, dt);
if (IS_ERR(update)) {
CERROR("%s: Get OSP update buf failed: rc = %d\n",
dt->do_lu.lo_dev->ld_obd->obd_name,
fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
- rc = osp_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
- &buf);
+ rc = out_insert_update(env, update, OBJ_INDEX_DELETE, fid, 1, &size,
+ (const char **)&key);
return rc;
}
*
* Note: for OSP, these index iterate api is only used to check
* whether the directory is empty now (see mdd_dir_is_empty).
- * Since dir_empty will be return by OBJ_ATTR_GET(see osp_md_attr_get/
+ * Since dir_empty will be return by OBJ_ATTR_GET(see osp_attr_get/
* out_attr_get). So the implementation of these iterator is simplied
* to make mdd_dir_is_empty happy. The real iterator should be
* implemented, if we need it one day.
return 0;
}
-static int osp_md_attr_get(const struct lu_env *env,
- struct dt_object *dt, struct lu_attr *attr,
- struct lustre_capa *capa)
-{
- struct osp_object *obj = dt2osp_obj(dt);
- struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
- struct update_request *update = NULL;
- struct ptlrpc_request *req = NULL;
- int rc;
- ENTRY;
-
- /* Because it needs send the update buffer right away,
- * just create an update buffer, instead of attaching the
- * update_remote list of the thandle.
- */
- update = osp_create_update_req(dt_dev);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
-
- rc = osp_insert_update(env, update, OBJ_ATTR_GET,
- (struct lu_fid *)lu_object_fid(&dt->do_lu),
- 0, NULL, NULL);
- if (rc) {
- CERROR("%s: Insert update error: rc = %d\n",
- dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
- GOTO(out, rc);
- }
- dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
-
- rc = osp_remote_sync(env, dt_dev, update, &req);
- if (rc < 0)
- GOTO(out, rc);
-
- rc = osp_get_attr_from_req(env, req, attr, 0);
- if (rc)
- GOTO(out, rc);
-
- if (attr->la_flags == 1)
- obj->opo_empty = 0;
- else
- obj->opo_empty = 1;
-out:
- if (req != NULL)
- ptlrpc_req_finished(req);
-
- osp_destroy_update_req(update);
-
- RETURN(rc);
-}
-
-static int osp_md_declare_object_destroy(const struct lu_env *env,
- struct dt_object *dt,
- struct thandle *th)
-{
- struct osp_object *o = dt2osp_obj(dt);
- int rc = 0;
- ENTRY;
-
- /*
- * track objects to be destroyed via llog
- */
- rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th);
-
- RETURN(rc);
-}
-
-static int osp_md_object_destroy(const struct lu_env *env,
- struct dt_object *dt, struct thandle *th)
-{
- struct osp_object *o = dt2osp_obj(dt);
- int rc = 0;
- ENTRY;
-
- /*
- * once transaction is committed put proper command on
- * the queue going to our OST
- */
- rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL);
-
- /* not needed in cache any more */
- set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
-
- RETURN(rc);
-}
-
static int osp_md_object_lock(const struct lu_env *env,
struct dt_object *dt,
struct lustre_handle *lh,
.do_ref_add = osp_md_object_ref_add,
.do_declare_ref_del = osp_md_declare_object_ref_del,
.do_ref_del = osp_md_object_ref_del,
- .do_declare_destroy = osp_md_declare_object_destroy,
- .do_destroy = osp_md_object_destroy,
+ .do_declare_destroy = osp_declare_object_destroy,
+ .do_destroy = osp_object_destroy,
.do_ah_init = osp_md_ah_init,
- .do_attr_get = osp_md_attr_get,
+ .do_attr_get = osp_attr_get,
.do_declare_attr_set = osp_md_declare_attr_set,
.do_attr_set = osp_md_attr_set,
- .do_declare_xattr_set = osp_md_declare_xattr_set,
- .do_xattr_set = osp_md_xattr_set,
- .do_xattr_get = osp_md_xattr_get,
+ .do_xattr_get = osp_xattr_get,
+ .do_declare_xattr_set = osp_declare_xattr_set,
+ .do_xattr_set = osp_xattr_set,
.do_index_try = osp_md_index_try,
.do_object_lock = osp_md_object_lock,
};
#include "osp_internal.h"
+static inline bool is_ost_obj(struct lu_object *lo)
+{
+ return !lu2osp_dev(lo->lo_dev)->opd_connect_mdt;
+}
+
static void osp_object_assign_fid(const struct lu_env *env,
struct osp_device *d, struct osp_object *o)
{
lu_object_assign_fid(env, &o->opo_obj.do_lu, &osi->osi_fid);
}
+static int osp_oac_init(struct osp_object *obj)
+{
+ struct osp_object_attr *ooa;
+
+ OBD_ALLOC_PTR(ooa);
+ if (ooa == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&ooa->ooa_xattr_list);
+ spin_lock(&obj->opo_lock);
+ if (likely(obj->opo_ooa == NULL)) {
+ obj->opo_ooa = ooa;
+ spin_unlock(&obj->opo_lock);
+ } else {
+ spin_unlock(&obj->opo_lock);
+ OBD_FREE_PTR(ooa);
+ }
+
+ return 0;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
+ const char *name, int namelen, bool unlink)
+{
+ struct osp_xattr_entry *oxe;
+
+ list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
+ if (namelen == oxe->oxe_namelen &&
+ strncmp(name, oxe->oxe_buf, namelen) == 0) {
+ if (unlink)
+ list_del_init(&oxe->oxe_list);
+ else
+ atomic_inc(&oxe->oxe_ref);
+
+ return oxe;
+ }
+ }
+
+ return NULL;
+}
+
+static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
+ const char *name)
+{
+ struct osp_xattr_entry *oxe = NULL;
+
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa != NULL)
+ oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
+ strlen(name), false);
+ spin_unlock(&obj->opo_lock);
+
+ return oxe;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, int len)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *oxe;
+ struct osp_xattr_entry *tmp = NULL;
+ int namelen = strlen(name);
+ int size = sizeof(*oxe) + namelen + 1 + len;
+
+ LASSERT(ooa != NULL);
+
+ oxe = osp_oac_xattr_find(obj, name);
+ if (oxe != NULL)
+ return oxe;
+
+ OBD_ALLOC(oxe, size);
+ if (unlikely(oxe == NULL))
+ return NULL;
+
+ INIT_LIST_HEAD(&oxe->oxe_list);
+ oxe->oxe_buflen = size;
+ oxe->oxe_namelen = namelen;
+ memcpy(oxe->oxe_buf, name, namelen);
+ oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+ /* One ref is for the caller, the other is for the entry on the list. */
+ atomic_set(&oxe->oxe_ref, 2);
+
+ spin_lock(&obj->opo_lock);
+ tmp = osp_oac_xattr_find_locked(ooa, name, namelen, false);
+ if (tmp == NULL)
+ list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+ spin_unlock(&obj->opo_lock);
+
+ if (tmp != NULL) {
+ OBD_FREE(oxe, size);
+ oxe = tmp;
+ }
+
+ return oxe;
+}
+
+static struct osp_xattr_entry *
+osp_oac_xattr_replace(struct osp_object *obj,
+ struct osp_xattr_entry **poxe, int len)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *old = *poxe;
+ struct osp_xattr_entry *oxe;
+ struct osp_xattr_entry *tmp = NULL;
+ int namelen = old->oxe_namelen;
+ int size = sizeof(*oxe) + namelen + 1 + len;
+
+ LASSERT(ooa != NULL);
+
+ OBD_ALLOC(oxe, size);
+ if (unlikely(oxe == NULL))
+ return NULL;
+
+ INIT_LIST_HEAD(&oxe->oxe_list);
+ oxe->oxe_buflen = size;
+ oxe->oxe_namelen = namelen;
+ memcpy(oxe->oxe_buf, old->oxe_buf, namelen);
+ oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+ /* One ref is for the caller, the other is for the entry on the list. */
+ atomic_set(&oxe->oxe_ref, 2);
+
+ spin_lock(&obj->opo_lock);
+ tmp = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen, true);
+ list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+ spin_unlock(&obj->opo_lock);
+
+ *poxe = tmp;
+ LASSERT(tmp != NULL);
+
+ return oxe;
+}
+
+static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
+{
+ if (atomic_dec_and_test(&oxe->oxe_ref)) {
+ LASSERT(list_empty(&oxe->oxe_list));
+
+ OBD_FREE(oxe, oxe->oxe_buflen);
+ }
+}
+
+static int osp_get_attr_from_reply(const struct lu_env *env,
+ struct update_reply *reply,
+ struct lu_attr *attr,
+ struct osp_object *obj, int index)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_buf *rbuf = &osi->osi_lb2;
+ struct obdo *lobdo = &osi->osi_obdo;
+ struct obdo *wobdo;
+ int rc;
+
+ rc = update_get_reply_buf(reply, rbuf, index);
+ if (rc < 0)
+ return rc;
+
+ wobdo = rbuf->lb_buf;
+ if (rbuf->lb_len != sizeof(*wobdo))
+ return -EPROTO;
+
+ obdo_le_to_cpu(wobdo, wobdo);
+ lustre_get_wire_obdo(NULL, lobdo, wobdo);
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa != NULL) {
+ la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid);
+ if (attr != NULL)
+ *attr = obj->opo_ooa->ooa_attr;
+ } else {
+ LASSERT(attr != NULL);
+
+ la_from_obdo(attr, lobdo, lobdo->o_valid);
+ }
+ spin_unlock(&obj->opo_lock);
+
+ return 0;
+}
+
+static int osp_attr_get_interpterer(const struct lu_env *env,
+ struct update_reply *reply,
+ struct osp_object *obj,
+ void *data, int index, int rc)
+{
+ struct lu_attr *attr = data;
+
+ LASSERT(obj->opo_ooa != NULL);
+
+ if (rc == 0) {
+ osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+ obj->opo_non_exist = 0;
+
+ return osp_get_attr_from_reply(env, reply, NULL, obj, index);
+ } else {
+ if (rc == -ENOENT) {
+ osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+ obj->opo_non_exist = 1;
+ }
+
+ spin_lock(&obj->opo_lock);
+ attr->la_valid = 0;
+ spin_unlock(&obj->opo_lock);
+ }
+
+ return 0;
+}
+
+static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lustre_capa *capa)
+{
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct update_request *update;
+ int rc = 0;
+
+ if (obj->opo_ooa == NULL) {
+ rc = osp_oac_init(obj);
+ if (rc != 0)
+ return rc;
+ }
+
+ mutex_lock(&osp->opd_async_requests_mutex);
+ update = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(update))
+ rc = PTR_ERR(update);
+ else
+ rc = osp_insert_async_update(env, update, OBJ_ATTR_GET, obj, 0,
+ NULL, NULL,
+ &obj->opo_ooa->ooa_attr,
+ osp_attr_get_interpterer);
+ mutex_unlock(&osp->opd_async_requests_mutex);
+
+ return rc;
+}
+
+int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr, struct lustre_capa *capa)
+{
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct dt_device *dev = &osp->opd_dt_dev;
+ struct update_request *update;
+ struct update_reply *reply;
+ struct ptlrpc_request *req = NULL;
+ int rc = 0;
+ ENTRY;
+
+ if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
+ RETURN(-ENOENT);
+
+ if (obj->opo_ooa != NULL) {
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa->ooa_attr.la_valid != 0) {
+ *attr = obj->opo_ooa->ooa_attr;
+ spin_unlock(&obj->opo_lock);
+
+ RETURN(0);
+ }
+ spin_unlock(&obj->opo_lock);
+ }
+
+ update = out_create_update_req(dev);
+ if (IS_ERR(update))
+ RETURN(PTR_ERR(update));
+
+ rc = out_insert_update(env, update, OBJ_ATTR_GET,
+ lu_object_fid(&dt->do_lu), 0, NULL, NULL);
+ if (rc != 0) {
+ CERROR("%s: Insert update error "DFID": rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+ if (rc != 0) {
+ if (rc == -ENOENT) {
+ osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+ obj->opo_non_exist = 1;
+ } else {
+ CERROR("%s:osp_attr_get update error "DFID": rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+ }
+
+ GOTO(out, rc);
+ }
+
+ osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+ obj->opo_non_exist = 0;
+ reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+ UPDATE_BUFFER_SIZE);
+ if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+ GOTO(out, rc = -EPROTO);
+
+ rc = osp_get_attr_from_reply(env, reply, attr, obj, 0);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ if (!is_ost_obj(&dt->do_lu)) {
+ if (attr->la_flags == 1)
+ obj->opo_empty = 0;
+ else
+ obj->opo_empty = 1;
+ }
+
+ GOTO(out, rc = 0);
+
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ out_destroy_update_req(update);
+
+ return rc;
+}
+
static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt,
const struct lu_attr *attr, struct thandle *th)
{
struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev);
struct osp_object *o = dt2osp_obj(dt);
+ struct lu_attr *la;
int rc = 0;
ENTRY;
* track all UID/GID changes via llog
*/
rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th);
+ if (rc != 0 || o->opo_ooa == NULL)
+ RETURN(rc);
- RETURN(rc);
+ la = &o->opo_ooa->ooa_attr;
+ spin_lock(&o->opo_lock);
+ if (attr->la_valid & LA_UID) {
+ la->la_uid = attr->la_uid;
+ la->la_valid |= LA_UID;
+ }
+
+ if (attr->la_valid & LA_GID) {
+ la->la_gid = attr->la_gid;
+ la->la_valid |= LA_GID;
+ }
+ spin_unlock(&o->opo_lock);
+
+ RETURN(0);
}
static int osp_attr_set(const struct lu_env *env, struct dt_object *dt,
RETURN(rc);
}
+static int osp_xattr_get_interpterer(const struct lu_env *env,
+ struct update_reply *reply,
+ struct osp_object *obj,
+ void *data, int index, int rc)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *oxe = data;
+ struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
+
+ LASSERT(ooa != NULL);
+
+ if (rc == 0) {
+ int len = sizeof(*oxe) + oxe->oxe_namelen + 1;
+
+ rc = update_get_reply_buf(reply, rbuf, index);
+ if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return rc < 0 ? rc : -ERANGE;
+ }
+
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_vallen = rbuf->lb_len;
+ memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+ } else if (rc == -ENOENT || rc == -ENODATA) {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_exist = 0;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+ } else {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+ }
+
+ osp_oac_xattr_put(oxe);
+
+ return 0;
+}
+
+static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa)
+{
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct update_request *update;
+ struct osp_xattr_entry *oxe;
+ int namelen = strlen(name);
+ int rc = 0;
+
+ LASSERT(buf != NULL);
+ LASSERT(name != NULL);
+
+ /* If only for xattr size, return directly. */
+ if (unlikely(buf->lb_len == 0))
+ return 0;
+
+ if (obj->opo_ooa == NULL) {
+ rc = osp_oac_init(obj);
+ if (rc != 0)
+ return rc;
+ }
+
+ oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+ if (oxe == NULL)
+ return -ENOMEM;
+
+ mutex_lock(&osp->opd_async_requests_mutex);
+ update = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(update)) {
+ rc = PTR_ERR(update);
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ osp_oac_xattr_put(oxe);
+ } else {
+ rc = osp_insert_async_update(env, update, OBJ_XATTR_GET, obj,
+ 1, &namelen, &name, oxe,
+ osp_xattr_get_interpterer);
+ if (rc != 0) {
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ osp_oac_xattr_put(oxe);
+ } else {
+ /* XXX: Currently, we trigger the batched async OUT
+ * RPC via dt_declare_xattr_get(). It is not
+ * perfect solution, but works well now.
+ *
+ * We will improve it in the future. */
+ update = osp->opd_async_requests;
+ if (update != NULL && update->ur_buf != NULL &&
+ update->ur_buf->ub_count > 0) {
+ osp->opd_async_requests = NULL;
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ rc = osp_unplug_async_update(env, osp, update);
+ } else {
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ }
+ }
+ }
+
+ return rc;
+}
+
+int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa)
+{
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct dt_device *dev = &osp->opd_dt_dev;
+ struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
+ struct update_request *update = NULL;
+ struct ptlrpc_request *req = NULL;
+ struct update_reply *reply;
+ struct osp_xattr_entry *oxe = NULL;
+ const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name;
+ int namelen;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(buf != NULL);
+ LASSERT(name != NULL);
+
+ if (unlikely(obj->opo_non_exist))
+ RETURN(-ENOENT);
+
+ oxe = osp_oac_xattr_find(obj, name);
+ if (oxe != NULL) {
+ spin_lock(&obj->opo_lock);
+ if (oxe->oxe_ready) {
+ if (!oxe->oxe_exist)
+ GOTO(unlock, rc = -ENODATA);
+
+ if (buf->lb_buf == NULL)
+ GOTO(unlock, rc = oxe->oxe_vallen);
+
+ if (buf->lb_len < oxe->oxe_vallen)
+ GOTO(unlock, rc = -ERANGE);
+
+ memcpy(buf->lb_buf, oxe->oxe_value, oxe->oxe_vallen);
+
+ GOTO(unlock, rc = oxe->oxe_vallen);
+
+unlock:
+ spin_unlock(&obj->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return rc;
+ }
+ spin_unlock(&obj->opo_lock);
+ }
+
+ update = out_create_update_req(dev);
+ if (IS_ERR(update))
+ GOTO(out, rc = PTR_ERR(update));
+
+ namelen = strlen(name);
+ rc = out_insert_update(env, update, OBJ_XATTR_GET,
+ lu_object_fid(&dt->do_lu), 1, &namelen, &name);
+ if (rc != 0) {
+ CERROR("%s: Insert update error "DFID": rc = %d\n",
+ dname, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+ if (rc != 0) {
+ if (obj->opo_ooa == NULL)
+ GOTO(out, rc);
+
+ if (oxe == NULL)
+ oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to cache for "
+ DFID" (1): rc = %d\n", dname, name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ spin_lock(&obj->opo_lock);
+ if (rc == -ENOENT || rc == -ENODATA) {
+ oxe->oxe_exist = 0;
+ oxe->oxe_ready = 1;
+ } else {
+ oxe->oxe_ready = 0;
+ }
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+ }
+
+ reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+ UPDATE_BUFFER_SIZE);
+ if (reply->ur_version != UPDATE_REPLY_V1) {
+ CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
+ dname, reply->ur_version, UPDATE_REPLY_V1,
+ PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
+
+ GOTO(out, rc = -EPROTO);
+ }
+
+ rc = update_get_reply_buf(reply, rbuf, 0);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ LASSERT(rbuf->lb_len > 0 && rbuf->lb_len < PAGE_CACHE_SIZE);
+
+ if (buf->lb_buf == NULL)
+ GOTO(out, rc = rbuf->lb_len);
+
+ if (unlikely(buf->lb_len < rbuf->lb_len))
+ GOTO(out, rc = -ERANGE);
+
+ memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
+ rc = rbuf->lb_len;
+ if (obj->opo_ooa == NULL)
+ GOTO(out, rc);
+
+ if (oxe == NULL) {
+ oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to "
+ "cache for "DFID" (2): rc = %d\n",
+ dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+ }
+
+ if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < rbuf->lb_len) {
+ struct osp_xattr_entry *old = oxe;
+ struct osp_xattr_entry *tmp;
+
+ tmp = osp_oac_xattr_replace(obj, &old, rbuf->lb_len);
+ osp_oac_xattr_put(oxe);
+ oxe = tmp;
+ if (tmp == NULL) {
+ CWARN("%s: Fail to update xattr (%s) to "
+ "cache for "DFID": rc = %d\n",
+ dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+ }
+
+ /* Drop the ref for entry on list. */
+ osp_oac_xattr_put(old);
+ }
+
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_vallen = rbuf->lb_len;
+ memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ if (update != NULL && !IS_ERR(update))
+ out_destroy_update_req(update);
+
+ if (oxe != NULL)
+ osp_oac_xattr_put(oxe);
+
+ return rc;
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int flag, struct thandle *th)
+{
+ struct osp_object *o = dt2osp_obj(dt);
+ struct update_request *update;
+ struct lu_fid *fid;
+ struct osp_xattr_entry *oxe;
+ int sizes[3] = {strlen(name), buf->lb_len,
+ sizeof(int)};
+ char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
+ int rc;
+
+ LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
+
+ update = out_find_create_update_loc(th, dt);
+ if (IS_ERR(update)) {
+ CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n",
+ dt->do_lu.lo_dev->ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)),
+ (int)PTR_ERR(update));
+
+ return PTR_ERR(update);
+ }
+
+ flag = cpu_to_le32(flag);
+ bufs[2] = (char *)&flag;
+
+ fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+ rc = out_insert_update(env, update, OBJ_XATTR_SET, fid,
+ ARRAY_SIZE(sizes), sizes, (const char **)bufs);
+ if (rc != 0 || o->opo_ooa == NULL)
+ return rc;
+
+ oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to cache for "DFID
+ ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ return 0;
+ }
+
+ if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
+ struct osp_xattr_entry *old = oxe;
+ struct osp_xattr_entry *tmp;
+
+ tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
+ osp_oac_xattr_put(oxe);
+ oxe = tmp;
+ if (tmp == NULL) {
+ CWARN("%s: Fail to update xattr (%s) to cache for "DFID
+ ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ spin_lock(&o->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&o->opo_lock);
+
+ return 0;
+ }
+
+ /* Drop the ref for entry on list. */
+ osp_oac_xattr_put(old);
+ }
+
+ spin_lock(&o->opo_lock);
+ oxe->oxe_vallen = buf->lb_len;
+ memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&o->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return 0;
+}
+
+int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name, int fl,
+ struct thandle *th, struct lustre_capa *capa)
+{
+ CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
+ PFID(&dt->do_lu.lo_header->loh_fid));
+
+ return 0;
+}
+
static int osp_declare_object_create(const struct lu_env *env,
struct dt_object *dt,
struct lu_attr *attr,
struct lu_fid *fid = &osi->osi_fid;
ENTRY;
+ o->opo_non_exist = 0;
if (o->opo_reserved) {
/* regular case, fid is assigned holding trunsaction open */
osp_object_assign_fid(env, d, o);
RETURN(rc);
}
-static int osp_declare_object_destroy(const struct lu_env *env,
- struct dt_object *dt, struct thandle *th)
+int osp_declare_object_destroy(const struct lu_env *env,
+ struct dt_object *dt, struct thandle *th)
{
struct osp_object *o = dt2osp_obj(dt);
int rc = 0;
RETURN(rc);
}
-static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
- struct thandle *th)
+int osp_object_destroy(const struct lu_env *env, struct dt_object *dt,
+ struct thandle *th)
{
struct osp_object *o = dt2osp_obj(dt);
int rc = 0;
ENTRY;
+ o->opo_non_exist = 1;
/*
* once transaction is committed put proper command on
* the queue going to our OST
}
struct dt_object_operations osp_obj_ops = {
+ .do_declare_attr_get = osp_declare_attr_get,
+ .do_attr_get = osp_attr_get,
.do_declare_attr_set = osp_declare_attr_set,
.do_attr_set = osp_attr_set,
+ .do_declare_xattr_get = osp_declare_xattr_get,
+ .do_xattr_get = osp_xattr_get,
+ .do_declare_xattr_set = osp_declare_xattr_set,
+ .do_xattr_set = osp_xattr_set,
.do_declare_create = osp_declare_object_create,
.do_create = osp_object_create,
.do_declare_destroy = osp_declare_object_destroy,
.do_destroy = osp_object_destroy,
};
-static int is_ost_obj(struct lu_object *lo)
-{
- struct osp_device *osp = lu2osp_dev(lo->lo_dev);
-
- return !osp->opd_connect_mdt;
-}
-
static int osp_object_init(const struct lu_env *env, struct lu_object *o,
const struct lu_object_conf *conf)
{
int rc = 0;
ENTRY;
+ spin_lock_init(&po->opo_lock);
+ o->lo_header->loh_attr |= LOHA_REMOTE;
+
if (is_ost_obj(o)) {
po->opo_obj.do_ops = &osp_obj_ops;
} else {
- struct lu_attr *la = &osp_env_info(env)->osi_attr;
+ struct lu_attr *la = &osp_env_info(env)->osi_attr;
po->opo_obj.do_ops = &osp_md_obj_ops;
- o->lo_header->loh_attr |= LOHA_REMOTE;
rc = po->opo_obj.do_ops->do_attr_get(env, lu2dt_obj(o),
la, NULL);
if (rc == 0)
o->lo_header->loh_attr |=
LOHA_EXISTS | (la->la_mode & S_IFMT);
- if (rc == -ENOENT)
+ if (rc == -ENOENT) {
+ po->opo_non_exist = 1;
rc = 0;
+ }
}
RETURN(rc);
}
dt_object_fini(&obj->opo_obj);
lu_object_header_fini(h);
+ if (obj->opo_ooa != NULL) {
+ struct osp_xattr_entry *oxe;
+ struct osp_xattr_entry *tmp;
+ int count;
+
+ list_for_each_entry_safe(oxe, tmp,
+ &obj->opo_ooa->ooa_xattr_list,
+ oxe_list) {
+ list_del(&oxe->oxe_list);
+ count = atomic_read(&oxe->oxe_ref);
+ LASSERTF(count == 1,
+ "Still has %d users on the xattr entry %.*s\n",
+ count - 1, oxe->oxe_namelen, oxe->oxe_buf);
+
+ OBD_FREE(oxe, oxe->oxe_buflen);
+ }
+ OBD_FREE_PTR(obj->opo_ooa);
+ }
OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
}
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/osp/osp_trans.c
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include "osp_internal.h"
+
+struct osp_async_update_args {
+ struct update_request *oaua_update;
+};
+
+struct osp_async_update_item {
+ struct list_head oaui_list;
+ struct osp_object *oaui_obj;
+ void *oaui_data;
+ osp_async_update_interpterer_t oaui_interpterer;
+};
+
+static struct osp_async_update_item *
+osp_async_update_item_init(struct osp_object *obj, void *data,
+ osp_async_update_interpterer_t interpterer)
+{
+ struct osp_async_update_item *oaui;
+
+ OBD_ALLOC_PTR(oaui);
+ if (oaui == NULL)
+ return NULL;
+
+ lu_object_get(osp2lu_obj(obj));
+ INIT_LIST_HEAD(&oaui->oaui_list);
+ oaui->oaui_obj = obj;
+ oaui->oaui_data = data;
+ oaui->oaui_interpterer = interpterer;
+
+ return oaui;
+}
+
+static void osp_async_update_item_fini(const struct lu_env *env,
+ struct osp_async_update_item *oaui)
+{
+ LASSERT(list_empty(&oaui->oaui_list));
+
+ lu_object_put(env, osp2lu_obj(oaui->oaui_obj));
+ OBD_FREE_PTR(oaui);
+}
+
+static int osp_async_update_interpret(const struct lu_env *env,
+ struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct update_reply *reply = NULL;
+ struct osp_async_update_args *oaua = arg;
+ struct update_request *update = oaua->oaua_update;
+ struct osp_async_update_item *oaui;
+ struct osp_async_update_item *next;
+ int count = 0;
+ int index = 0;
+ int rc1 = 0;
+
+ if (rc == 0 || req->rq_repmsg != NULL) {
+ reply = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_UPDATE_REPLY,
+ UPDATE_BUFFER_SIZE);
+ if (reply == NULL || reply->ur_version != UPDATE_REPLY_V1)
+ rc1 = -EPROTO;
+ else
+ count = reply->ur_count;
+ } else {
+ rc1 = rc;
+ }
+
+ list_for_each_entry_safe(oaui, next, &update->ur_cb_items, oaui_list) {
+ list_del_init(&oaui->oaui_list);
+ if (index < count && reply->ur_lens[index] > 0) {
+ char *ptr = update_get_buf_internal(reply, index, NULL);
+
+ LASSERT(ptr != NULL);
+
+ rc1 = le32_to_cpu(*(int *)ptr);
+ } else {
+ rc1 = rc;
+ if (unlikely(rc1 == 0))
+ rc1 = -EINVAL;
+ }
+
+ oaui->oaui_interpterer(env, reply, oaui->oaui_obj,
+ oaui->oaui_data, index, rc1);
+ osp_async_update_item_fini(env, oaui);
+ index++;
+ }
+
+ out_destroy_update_req(update);
+
+ return 0;
+}
+
+int osp_unplug_async_update(const struct lu_env *env,
+ struct osp_device *osp,
+ struct update_request *update)
+{
+ struct osp_async_update_args *args;
+ struct ptlrpc_request *req = NULL;
+ int rc;
+
+ rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+ update->ur_buf, UPDATE_BUFFER_SIZE, &req);
+ if (rc != 0) {
+ struct osp_async_update_item *oaui;
+ struct osp_async_update_item *next;
+
+ list_for_each_entry_safe(oaui, next,
+ &update->ur_cb_items, oaui_list) {
+ list_del_init(&oaui->oaui_list);
+ oaui->oaui_interpterer(env, NULL, oaui->oaui_obj,
+ oaui->oaui_data, 0, rc);
+ osp_async_update_item_fini(env, oaui);
+ }
+ out_destroy_update_req(update);
+ } else {
+ LASSERT(list_empty(&update->ur_list));
+
+ args = ptlrpc_req_async_args(req);
+ args->oaua_update = update;
+ req->rq_interpret_reply = osp_async_update_interpret;
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ }
+
+ return rc;
+}
+
+/* with osp::opd_async_requests_mutex held */
+struct update_request *
+osp_find_or_create_async_update_request(struct osp_device *osp)
+{
+ struct update_request *update = osp->opd_async_requests;
+
+ if (update != NULL)
+ return update;
+
+ update = out_create_update_req(&osp->opd_dt_dev);
+ if (!IS_ERR(update))
+ osp->opd_async_requests = update;
+
+ return update;
+}
+
+/* with osp::opd_async_requests_mutex held */
+int osp_insert_async_update(const struct lu_env *env,
+ struct update_request *update, int op,
+ struct osp_object *obj, int count,
+ int *lens, const char **bufs, void *data,
+ osp_async_update_interpterer_t interpterer)
+{
+ struct osp_async_update_item *oaui;
+ struct osp_device *osp = lu2osp_dev(osp2lu_obj(obj)->lo_dev);
+ int rc = 0;
+ ENTRY;
+
+ oaui = osp_async_update_item_init(obj, data, interpterer);
+ if (oaui == NULL)
+ RETURN(-ENOMEM);
+
+again:
+ rc = out_insert_update(env, update, op, lu_object_fid(osp2lu_obj(obj)),
+ count, lens, bufs);
+ if (rc == -E2BIG) {
+ osp->opd_async_requests = NULL;
+ mutex_unlock(&osp->opd_async_requests_mutex);
+
+ rc = osp_unplug_async_update(env, osp, update);
+ mutex_lock(&osp->opd_async_requests_mutex);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ update = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(update))
+ GOTO(out, rc = PTR_ERR(update));
+
+ goto again;
+ }
+
+ if (rc == 0)
+ list_add_tail(&oaui->oaui_list, &update->ur_cb_items);
+
+ GOTO(out, rc);
+
+out:
+ if (rc != 0)
+ osp_async_update_item_fini(env, oaui);
+
+ return rc;
+}
+
+struct thandle *osp_trans_create(const struct lu_env *env,
+ struct dt_device *d)
+{
+ struct thandle *th;
+
+ OBD_ALLOC_PTR(th);
+ if (unlikely(th == NULL))
+ return ERR_PTR(-ENOMEM);
+
+ th->th_dev = d;
+ th->th_tags = LCT_TX_HANDLE;
+ INIT_LIST_HEAD(&th->th_remote_update_list);
+
+ return th;
+}
+
+static int osp_trans_trigger(const struct lu_env *env, struct osp_device *osp,
+ struct thandle *th)
+{
+ struct update_request *update = th->th_current_request;
+ int rc = 0;
+
+ if (unlikely(update == NULL || update->ur_buf == NULL ||
+ update->ur_buf->ub_count == 0))
+ return 0;
+
+ if (is_remote_trans(th)) {
+ struct osp_async_update_args *args;
+ struct ptlrpc_request *req;
+
+ list_del_init(&update->ur_list);
+ th->th_current_request = NULL;
+ rc = out_prep_update_req(env, osp->opd_obd->u.cli.cl_import,
+ update->ur_buf,
+ UPDATE_BUFFER_SIZE, &req);
+ if (rc == 0) {
+ args = ptlrpc_req_async_args(req);
+ args->oaua_update = update;
+ req->rq_interpret_reply =
+ osp_async_update_interpret;
+ ptlrpcd_add_req(req, PDL_POLICY_LOCAL, -1);
+ } else {
+ out_destroy_update_req(update);
+ }
+ } else {
+ th->th_sync = 1;
+ rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import,
+ update, NULL);
+ }
+
+ return rc;
+}
+
+int osp_trans_start(const struct lu_env *env, struct dt_device *dt,
+ struct thandle *th)
+{
+ int rc = 0;
+
+ if (!is_remote_trans(th))
+ rc = osp_trans_trigger(env, dt2osp_dev(dt), th);
+
+ return rc;
+}
+
+int osp_trans_stop(const struct lu_env *env, struct thandle *th)
+{
+ struct update_request *update = th->th_current_request;
+ int rc = 0;
+
+ if (is_remote_trans(th)) {
+ LASSERT(update == NULL);
+
+ update = out_find_update(th, th->th_dev);
+ th->th_current_request = update;
+ if (th->th_result == 0)
+ rc = osp_trans_trigger(env, dt2osp_dev(th->th_dev), th);
+ else
+ rc = th->th_result;
+
+ if (th->th_current_request != NULL)
+ out_destroy_update_req(update);
+
+ OBD_FREE_PTR(th);
+ } else {
+ LASSERT(update != NULL);
+
+ rc = update->ur_rc;
+ out_destroy_update_req(update);
+ th->th_current_request = NULL;
+ }
+
+ return rc;
+}
target_objs := $(TARGET)tgt_main.o $(TARGET)tgt_lastrcvd.o
target_objs += $(TARGET)tgt_handler.o $(TARGET)out_handler.o
+target_objs += $(TARGET)out_lib.o $(TARGET)out_lib.o
ptlrpc-objs := $(ldlm_objs) $(ptlrpc_objs)
@SERVER_TRUE@ptlrpc-objs += $(target_objs)
tgt_%.c: @LUSTRE@/target/tgt_%.c
ln -sf $< $@
+out_%.c: @LUSTRE@/target/out_%.c
+ ln -sf $< $@
+
EXTRA_DIST := $(ptlrpc_objs:.o=.c) ptlrpc_internal.h
EXTRA_DIST += $(TARGET)tgt_internal.h
@SERVER_FALSE@EXTRA_DIST += $(target_objs:.o=.c)
}
}
#endif
- /*
- * XXX So far only "client" ptlrpcd uses an environment. In
- * the future, ptlrpcd thread (or a thread-set) has to given
- * an argument, describing its "scope".
- */
- rc = lu_context_init(&env.le_ctx,
- LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
+ /* Both client and server (MDT/OST) may use the environment. */
+ rc = lu_context_init(&env.le_ctx, LCT_MD_THREAD | LCT_DT_THREAD |
+ LCT_CL_THREAD | LCT_REMEMBER |
+ LCT_NOREF);
if (rc == 0) {
rc = lu_context_init(env.le_ses,
LCT_SESSION|LCT_REMEMBER|LCT_NOREF);
#
MOSTLYCLEANFILES := @MOSTLYCLEANFILES@
-EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h out_handler.c
+EXTRA_DIST = tgt_main.c tgt_lastrcvd.c tgt_handler.c tgt_internal.h \
+ out_handler.c out_lib.c
}
static int out_tx_start(const struct lu_env *env, struct dt_device *dt,
- struct thandle_exec_args *ta)
+ struct thandle_exec_args *ta, struct obd_export *exp)
{
memset(ta, 0, sizeof(*ta));
ta->ta_handle = dt_trans_create(env, dt);
return PTR_ERR(ta->ta_handle);
}
ta->ta_dev = dt;
- /*For phase I, sync for cross-ref operation*/
- ta->ta_handle->th_sync = 1;
+ if (exp->exp_need_sync)
+ ta->ta_handle->th_sync = 1;
+
return 0;
}
static int out_trans_start(const struct lu_env *env,
struct thandle_exec_args *ta)
{
- /* Always do sync commit for Phase I */
- LASSERT(ta->ta_handle->th_sync != 0);
return dt_trans_start(env, ta->ta_dev, ta->ta_handle);
}
int rc;
ta->ta_handle->th_result = err;
- LASSERT(ta->ta_handle->th_sync != 0);
rc = dt_trans_stop(env, ta->ta_dev, ta->ta_handle);
for (i = 0; i < ta->ta_argno; i++) {
if (ta->ta_args[i].object != NULL) {
0, rc);
update_insert_reply(tti->tti_u.update.tti_update_reply, obdo,
- sizeof(*obdo), 0, rc);
+ sizeof(*obdo),
+ tti->tti_u.update.tti_update_reply_index, rc);
RETURN(rc);
}
struct dt_object *obj = tti->tti_u.update.tti_dt_object;
char *name;
void *ptr;
+ int idx = tti->tti_u.update.tti_update_reply_index;
int rc;
ENTRY;
RETURN(err_serious(-EPROTO));
}
- ptr = update_get_buf_internal(reply, 0, NULL);
+ ptr = update_get_buf_internal(reply, idx, NULL);
LASSERT(ptr != NULL);
/* The first 4 bytes(int) are used to store the result */
CDEBUG(D_INFO, "%s: "DFID" get xattr %s len %d\n",
tgt_name(tsi->tsi_tgt), PFID(lu_object_fid(&obj->do_lu)),
name, (int)lbuf->lb_len);
+
+ GOTO(out, rc);
+
out:
*(int *)ptr = rc;
- reply->ur_lens[0] = lbuf->lb_len + sizeof(int);
- RETURN(rc);
+ reply->ur_lens[idx] = lbuf->lb_len + sizeof(int);
+
+ return rc;
}
static int out_index_lookup(struct tgt_session_info *tsi)
0, rc);
update_insert_reply(tti->tti_u.update.tti_update_reply,
- &tti->tti_fid1, sizeof(tti->tti_fid1), 0, rc);
+ &tti->tti_fid1, sizeof(tti->tti_fid1),
+ tti->tti_u.update.tti_update_reply_index, rc);
RETURN(rc);
}
RETURN(err_serious(-EPROTO));
}
- if (le32_to_cpu(ubuf->ub_magic) != UPDATE_BUFFER_MAGIC) {
+ if (ubuf->ub_magic != UPDATE_BUFFER_MAGIC) {
CERROR("%s: invalid magic %x expect %x: rc = %d\n",
- tgt_name(tsi->tsi_tgt), le32_to_cpu(ubuf->ub_magic),
+ tgt_name(tsi->tsi_tgt), ubuf->ub_magic,
UPDATE_BUFFER_MAGIC, -EPROTO);
RETURN(err_serious(-EPROTO));
}
- count = le32_to_cpu(ubuf->ub_count);
+ count = ubuf->ub_count;
if (count <= 0) {
CERROR("%s: No update!: rc = %d\n",
tgt_name(tsi->tsi_tgt), -EPROTO);
update_init_reply_buf(update_reply, count);
tti->tti_u.update.tti_update_reply = update_reply;
- rc = out_tx_start(env, dt, ta);
+ rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
if (rc != 0)
RETURN(rc);
if (rc != 0)
RETURN(rc);
- rc = out_tx_start(env, dt, ta);
+ rc = out_tx_start(env, dt, ta, tsi->tsi_exp);
if (rc != 0)
RETURN(rc);
old_batchid = update->u_batchid;
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2013, Intel Corporation.
+ */
+/*
+ * lustre/target/out_lib.c
+ *
+ * Author: Di Wang <di.wang@intel.com>
+ * Author: Fan, Yong <fan.yong@intel.com>
+ */
+
+#define DEBUG_SUBSYSTEM S_CLASS
+
+#include <lu_target.h>
+#include <lustre_update.h>
+#include <obd.h>
+
+struct update_request *out_find_update(struct thandle *th,
+ struct dt_device *dt_dev)
+{
+ struct update_request *update;
+
+ list_for_each_entry(update, &th->th_remote_update_list, ur_list) {
+ if (update->ur_dt == dt_dev)
+ return update;
+ }
+
+ return NULL;
+}
+EXPORT_SYMBOL(out_find_update);
+
+void out_destroy_update_req(struct update_request *update)
+{
+ if (update == NULL)
+ return;
+
+ LASSERT(list_empty(&update->ur_cb_items));
+
+ list_del(&update->ur_list);
+ if (update->ur_buf != NULL)
+ OBD_FREE_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
+
+ OBD_FREE_PTR(update);
+}
+EXPORT_SYMBOL(out_destroy_update_req);
+
+struct update_request *out_create_update_req(struct dt_device *dt)
+{
+ struct update_request *update;
+
+ OBD_ALLOC_PTR(update);
+ if (update == NULL)
+ return ERR_PTR(-ENOMEM);
+
+ OBD_ALLOC_LARGE(update->ur_buf, UPDATE_BUFFER_SIZE);
+ if (update->ur_buf == NULL) {
+ OBD_FREE_PTR(update);
+
+ return ERR_PTR(-ENOMEM);
+ }
+
+ INIT_LIST_HEAD(&update->ur_list);
+ update->ur_dt = dt;
+ update->ur_buf->ub_magic = UPDATE_BUFFER_MAGIC;
+ update->ur_buf->ub_count = 0;
+ INIT_LIST_HEAD(&update->ur_cb_items);
+
+ return update;
+}
+EXPORT_SYMBOL(out_create_update_req);
+
+/**
+ * Find one loc in th_dev/dev_obj_update for the update,
+ * Because only one thread can access this thandle, no need
+ * lock now.
+ */
+struct update_request *out_find_create_update_loc(struct thandle *th,
+ struct dt_object *dt)
+{
+ struct dt_device *dt_dev = lu2dt_dev(dt->do_lu.lo_dev);
+ struct update_request *update;
+ ENTRY;
+
+ update = out_find_update(th, dt_dev);
+ if (update != NULL)
+ RETURN(update);
+
+ update = out_create_update_req(dt_dev);
+ if (IS_ERR(update))
+ RETURN(update);
+
+ list_add_tail(&update->ur_list, &th->th_remote_update_list);
+
+ RETURN(update);
+}
+EXPORT_SYMBOL(out_find_create_update_loc);
+
+int out_prep_update_req(const struct lu_env *env, struct obd_import *imp,
+ const struct update_buf *ubuf, int ubuf_len,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req;
+ struct update_buf *tmp;
+ int rc;
+ ENTRY;
+
+ req = ptlrpc_request_alloc(imp, &RQF_UPDATE_OBJ);
+ if (req == NULL)
+ RETURN(-ENOMEM);
+
+ req_capsule_set_size(&req->rq_pill, &RMF_UPDATE, RCL_CLIENT,
+ UPDATE_BUFFER_SIZE);
+
+ rc = ptlrpc_request_pack(req, LUSTRE_MDS_VERSION, UPDATE_OBJ);
+ if (rc != 0) {
+ ptlrpc_req_finished(req);
+ RETURN(rc);
+ }
+
+ req_capsule_set_size(&req->rq_pill, &RMF_UPDATE_REPLY, RCL_SERVER,
+ UPDATE_BUFFER_SIZE);
+
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_UPDATE);
+ memcpy(tmp, ubuf, ubuf_len);
+ ptlrpc_request_set_replen(req);
+ req->rq_request_portal = OUT_PORTAL;
+ req->rq_reply_portal = OSC_REPLY_PORTAL;
+ *reqp = req;
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(out_prep_update_req);
+
+int out_remote_sync(const struct lu_env *env, struct obd_import *imp,
+ struct update_request *update,
+ struct ptlrpc_request **reqp)
+{
+ struct ptlrpc_request *req = NULL;
+ int rc;
+ ENTRY;
+
+ rc = out_prep_update_req(env, imp, update->ur_buf,
+ UPDATE_BUFFER_SIZE, &req);
+ if (rc != 0)
+ RETURN(rc);
+
+ /* Note: some dt index api might return non-zero result here, like
+ * osd_index_ea_lookup, so we should only check rc < 0 here */
+ rc = ptlrpc_queue_wait(req);
+ if (rc < 0) {
+ ptlrpc_req_finished(req);
+ update->ur_rc = rc;
+ RETURN(rc);
+ }
+
+ if (reqp != NULL) {
+ *reqp = req;
+ } else {
+ update->ur_rc = rc;
+ ptlrpc_req_finished(req);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(out_remote_sync);
+
+int out_insert_update(const struct lu_env *env, struct update_request *update,
+ int op, const struct lu_fid *fid, int count,
+ int *lens, const char **bufs)
+{
+ struct update_buf *ubuf = update->ur_buf;
+ struct update *obj_update;
+ char *ptr;
+ int i;
+ int update_length;
+ ENTRY;
+
+ obj_update = (struct update *)((char *)ubuf +
+ cfs_size_round(update_buf_size(ubuf)));
+
+ /* Check update size to make sure it can fit into the buffer */
+ update_length = cfs_size_round(offsetof(struct update,
+ u_bufs[0]));
+ for (i = 0; i < count; i++)
+ update_length += cfs_size_round(lens[i]);
+
+ if (cfs_size_round(update_buf_size(ubuf)) + update_length >
+ UPDATE_BUFFER_SIZE || ubuf->ub_count >= UPDATE_MAX_OPS)
+ RETURN(-E2BIG);
+
+ if (count > UPDATE_BUF_COUNT)
+ RETURN(-E2BIG);
+
+ /* fill the update into the update buffer */
+ fid_cpu_to_le(&obj_update->u_fid, fid);
+ obj_update->u_type = cpu_to_le32(op);
+ obj_update->u_batchid = update->ur_batchid;
+ for (i = 0; i < count; i++)
+ obj_update->u_lens[i] = cpu_to_le32(lens[i]);
+
+ ptr = (char *)obj_update +
+ cfs_size_round(offsetof(struct update, u_bufs[0]));
+ for (i = 0; i < count; i++)
+ LOGL(bufs[i], lens[i], ptr);
+
+ ubuf->ub_count++;
+
+ CDEBUG(D_INFO, "%s: %p "DFID" idx %d: op %d params %d:%lu\n",
+ update->ur_dt->dd_lu_dev.ld_obd->obd_name, ubuf, PFID(fid),
+ ubuf->ub_count, op, count, update_buf_size(ubuf));
+
+ RETURN(0);
+}
+EXPORT_SYMBOL(out_insert_update);
struct thandle_exec_args {
struct thandle *ta_handle;
struct dt_device *ta_dev;
- int ta_err;
struct tx_arg ta_args[TX_MAX_OPS];
+ int ta_err;
int ta_argno; /* used args */
};
ccb->lncc_exp->exp_client_uuid.uuid);
spin_lock(&ccb->lncc_exp->exp_lock);
- ccb->lncc_exp->exp_need_sync = 0;
+ /* XXX: Currently, we use per-export based sync/async policy for
+ * the update via OUT RPC, it is coarse-grained policy, and
+ * will be changed as per-request based by DNE II patches. */
+ if (!ccb->lncc_exp->exp_keep_sync)
+ ccb->lncc_exp->exp_need_sync = 0;
+
spin_unlock(&ccb->lncc_exp->exp_lock);
class_export_cb_put(ccb->lncc_exp);