/* obdo.c */
#ifdef __KERNEL__
void obdo_from_la(struct obdo *dst, struct lu_attr *la, __u64 valid);
-void la_from_obdo(struct lu_attr *la, struct obdo *dst, obd_flag valid);
+void la_from_obdo(struct lu_attr *la, const struct obdo *dst, obd_flag valid);
void obdo_refresh_inode(struct inode *dst, struct obdo *src, obd_flag valid);
void obdo_to_inode(struct inode *dst, struct obdo *src, obd_flag valid);
#define ll_inode_flags(inode) (inode->i_flags)
rwlock_t lut_sptlrpc_lock;
struct sptlrpc_rule_set lut_sptlrpc_rset;
int lut_sec_level;
+
+ spinlock_t lut_flags_lock;
unsigned int lut_mds_capa:1,
- lut_oss_capa:1;
+ lut_oss_capa:1,
+ lut_syncjournal:1,
+ lut_sync_lock_cancel:2;
/* LAST_RCVD parameters */
/** last_rcvd file */
struct lu_target *tsi_tgt;
const struct mdt_body *tsi_mdt_body;
+ struct ost_body *tsi_ost_body;
struct lu_object *tsi_corpus;
+ struct lu_fid tsi_fid;
+ struct ldlm_res_id tsi_resid;
/*
* Additional fail id that can be set by handler.
*/
int tsi_reply_fail_id;
int tsi_request_fail_id;
- __u32 tsi_has_trans:1; /* has txn already? */
+ int tsi_has_trans:1; /* has txn already? */
+ /* request JobID */
+ char *tsi_jobid;
};
static inline struct tgt_session_info *tgt_ses_info(const struct lu_env *env)
int th_version;
/* Handler function */
int (*th_act)(struct tgt_session_info *tti);
+ /* Handler function for high priority requests */
+ int (*th_hp)(struct tgt_session_info *tti);
/* Request format for this request */
const struct req_format *th_fmt;
};
void tgt_counter_incr(struct obd_export *exp, int opcode);
int tgt_connect_check_sptlrpc(struct ptlrpc_request *req,
struct obd_export *exp);
+int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial);
int tgt_connect(struct tgt_session_info *tsi);
int tgt_disconnect(struct tgt_session_info *uti);
int tgt_obd_ping(struct tgt_session_info *tsi);
int tgt_sec_ctx_init_cont(struct tgt_session_info *tsi);
int tgt_sec_ctx_fini(struct tgt_session_info *tsi);
int tgt_sendpage(struct tgt_session_info *tsi, struct lu_rdpg *rdpg, int nob);
+int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa);
+int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
+ struct dt_object *obj);
+
+int tgt_io_thread_init(struct ptlrpc_thread *thread);
+void tgt_io_thread_done(struct ptlrpc_thread *thread);
+
+int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ __u64 start, __u64 end, struct lustre_handle *lh,
+ int mode, __u64 *flags);
+void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode);
+int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ struct obd_ioobj *obj, struct niobuf_remote *nb,
+ struct lustre_handle *lh, int mode);
+void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+ struct lustre_handle *lh, int mode);
+int tgt_brw_read(struct tgt_session_info *tsi);
+int tgt_brw_write(struct tgt_session_info *tsi);
+int tgt_hpreq_handler(struct ptlrpc_request *req);
extern struct tgt_handler tgt_sec_ctx_handlers[];
extern struct tgt_handler tgt_obd_handlers[];
int tgt_last_rcvd_update(const struct lu_env *env, struct lu_target *tgt,
struct dt_object *obj, __u64 opdata,
struct thandle *th, struct ptlrpc_request *req);
+int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
+ struct dt_object *obj, struct thandle *th,
+ struct obd_export *exp);
+
enum {
ESERIOUS = 0x0001000
};
return (rc < 0 && -rc & ESERIOUS);
}
+/**
+ * Do not return server-side uid/gid to remote client
+ */
+static inline void tgt_drop_id(struct obd_export *exp, struct obdo *oa)
+{
+ if (unlikely(exp_connect_rmtclient(exp))) {
+ oa->o_uid = -1;
+ oa->o_gid = -1;
+ oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
+ }
+}
+
/*
* Unified target generic handers macros and generic functions.
*/
TGT_RPC_HANDLER(MDS_FIRST_OPC, flags, name, fn, NULL, \
LUSTRE_MDS_VERSION)
+/* MDT Request with a format known in advance */
+#define TGT_OST_HDL(flags, name, fn) \
+ TGT_RPC_HANDLER(OST_FIRST_OPC, flags, name, fn, &RQF_ ## name, \
+ LUSTRE_OST_VERSION)
+
/* MGS request with a format known in advance */
#define TGT_MGS_HDL(flags, name, fn) \
TGT_RPC_HANDLER(MGS_FIRST_OPC, flags, name, fn, &RQF_ ## name, \
extern struct req_format RQF_OST_BRW_WRITE;
extern struct req_format RQF_OST_STATFS;
extern struct req_format RQF_OST_SET_GRANT_INFO;
-extern struct req_format RQF_OST_GET_INFO_GENERIC;
+extern struct req_format RQF_OST_GET_INFO;
extern struct req_format RQF_OST_GET_INFO_LAST_ID;
extern struct req_format RQF_OST_GET_INFO_LAST_FID;
extern struct req_format RQF_OST_SET_INFO_LAST_FID;
#define OBD_FAIL_OST_ENOINO 0x229
#define OBD_FAIL_OST_DQACQ_NET 0x230
#define OBD_FAIL_OST_STATFS_EINPROGRESS 0x231
+#define OBD_FAIL_OST_SET_INFO_NET 0x232
#define OBD_FAIL_LDLM 0x300
#define OBD_FAIL_LDLM_NAMESPACE_NEW 0x301
thread->t_env = env;
thread->t_id = -1; /* force filter_iobuf_get/put to use local buffers */
env->le_ctx.lc_thread = thread;
- thread->t_data = NULL;
- thread->t_watchdog = NULL;
+ tgt_io_thread_init(thread); /* init thread_big_cache for IO requests */
+ thread->t_watchdog = NULL;
CDEBUG(D_HA, "%s: started recovery thread pid %d\n", obd->obd_name,
current_pid());
trd->trd_processing_task = 0;
complete(&trd->trd_finishing);
- OBD_FREE_PTR(thread);
- OBD_FREE_PTR(env);
- RETURN(rc);
+ tgt_io_thread_done(thread);
+ OBD_FREE_PTR(thread);
+ OBD_FREE_PTR(env);
+ RETURN(rc);
}
static int target_start_recovery_thread(struct lu_target *lut,
EXIT;
}
-static int mdt_adapt_sptlrpc_conf(struct obd_device *obd, int initial)
-{
- struct mdt_device *m = mdt_dev(obd->obd_lu_dev);
- struct sptlrpc_rule_set tmp_rset;
- int rc;
-
- sptlrpc_rule_set_init(&tmp_rset);
- rc = sptlrpc_conf_target_get_rules(obd, &tmp_rset, initial);
- if (rc) {
- CERROR("mdt %s: failed get sptlrpc rules: %d\n",
- mdt_obd_name(m), rc);
- return rc;
- }
-
- sptlrpc_target_update_exp_flavor(obd, &tmp_rset);
-
- write_lock(&m->mdt_lut.lut_sptlrpc_lock);
- sptlrpc_rule_set_free(&m->mdt_lut.lut_sptlrpc_rset);
- m->mdt_lut.lut_sptlrpc_rset = tmp_rset;
- write_unlock(&m->mdt_lut.lut_sptlrpc_lock);
-
- return 0;
-}
-
int mdt_postrecov(const struct lu_env *, struct mdt_device *);
static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
if (rc)
GOTO(err_tgt, rc);
- mdt_adapt_sptlrpc_conf(obd, 1);
+ tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
next = m->mdt_child;
rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
__u32 vallen, void *val,
struct ptlrpc_request_set *set)
{
- struct obd_device *obd = exp->exp_obd;
- int rc;
- ENTRY;
+ int rc;
- LASSERT(obd);
+ ENTRY;
- if (KEY_IS(KEY_SPTLRPC_CONF)) {
- rc = mdt_adapt_sptlrpc_conf(obd, 0);
- RETURN(rc);
- }
+ if (KEY_IS(KEY_SPTLRPC_CONF)) {
+ rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
+ RETURN(rc);
+ }
- RETURN(0);
+ RETURN(0);
}
/**
return -EINVAL;
}
+ spin_lock(&mdt->mdt_lut.lut_flags_lock);
mdt->mdt_lut.lut_oss_capa = !!(val & 0x1);
mdt->mdt_lut.lut_mds_capa = !!(val & 0x2);
+ spin_unlock(&mdt->mdt_lut.lut_flags_lock);
mdt->mdt_capa_conf = 1;
LCONSOLE_INFO("MDS %s %s MDS fid capability.\n",
mdt_obd_name(mdt),
/* Return non-zero for a fully connected export */
int class_connected_export(struct obd_export *exp)
{
+ int connected = 0;
+
if (exp) {
- int connected;
spin_lock(&exp->exp_lock);
- connected = (exp->exp_conn_cnt > 0);
+ connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
spin_unlock(&exp->exp_lock);
- return connected;
}
- return 0;
+ return connected;
}
EXPORT_SYMBOL(class_connected_export);
EXPORT_SYMBOL(obdo_from_la);
/*FIXME: Just copy from obdo_from_inode*/
-void la_from_obdo(struct lu_attr *dst, struct obdo *obdo, obd_flag valid)
+void la_from_obdo(struct lu_attr *dst, const struct obdo *obdo, obd_flag valid)
{
__u64 newvalid = 0;
echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
void *karg, void *uarg)
{
+#ifdef HAVE_SERVER_SUPPORT
+ struct tgt_session_info *tsi;
+#endif
struct obd_device *obd = exp->exp_obd;
struct echo_device *ed = obd2echo_dev(obd);
struct echo_client_obd *ec = ed->ed_ec;
int rw = OBD_BRW_READ;
int rc = 0;
int i;
+#ifdef HAVE_SERVER_SUPPORT
+ struct lu_context echo_session;
+#endif
ENTRY;
memset(&dummy_oti, 0, sizeof(dummy_oti));
if (env == NULL)
RETURN(-ENOMEM);
- rc = lu_env_init(env, LCT_DT_THREAD | LCT_MD_THREAD);
- if (rc)
- GOTO(out, rc = -ENOMEM);
+ rc = lu_env_init(env, LCT_DT_THREAD);
+ if (rc)
+ GOTO(out_alloc, rc = -ENOMEM);
+#ifdef HAVE_SERVER_SUPPORT
+ env->le_ses = &echo_session;
+ rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
+ if (unlikely(rc < 0))
+ GOTO(out_env, rc);
+ lu_context_enter(env->le_ses);
+
+ tsi = tgt_ses_info(env);
+ tsi->tsi_exp = ec->ec_exp;
+#endif
switch (cmd) {
case OBD_IOC_CREATE: /* may create echo object */
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
EXIT;
out:
+#ifdef HAVE_SERVER_SUPPORT
+ lu_context_exit(env->le_ses);
+ lu_context_fini(env->le_ses);
+out_env:
+#endif
lu_env_fini(env);
+out_alloc:
OBD_FREE_PTR(env);
/* XXX this should be in a helper also called by target_send_reply */
int count, int *eof, void *data)
{
struct obd_device *obd = data;
- struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+ struct lu_target *tgt = obd->u.obt.obt_lut;
int rc;
rc = snprintf(page, count, "%s\n",
- sync_on_cancel_states[ofd->ofd_sync_lock_cancel]);
+ sync_on_cancel_states[tgt->lut_sync_lock_cancel]);
return rc;
}
unsigned long count, void *data)
{
struct obd_device *obd = data;
- struct ofd_device *ofd = ofd_dev(obd->obd_lu_dev);
+ struct lu_target *tgt = obd->u.obt.obt_lut;
int val = -1;
int i;
if (val < 0 || val > 2)
return -EINVAL;
- spin_lock(&ofd->ofd_flags_lock);
- ofd->ofd_sync_lock_cancel = val;
- spin_unlock(&ofd->ofd_flags_lock);
+ spin_lock(&tgt->lut_flags_lock);
+ tgt->lut_sync_lock_cancel = val;
+ spin_unlock(&tgt->lut_flags_lock);
return count;
}
void ofd_stats_counter_init(struct lprocfs_stats *stats)
{
- LASSERT(stats && stats->ls_num == LPROC_OFD_STATS_LAST);
+ LASSERT(stats && stats->ls_num >= LPROC_OFD_STATS_LAST);
+
lprocfs_counter_init(stats, LPROC_OFD_STATS_READ,
- LPROCFS_CNTR_AVGMINMAX, "read", "bytes");
+ LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
lprocfs_counter_init(stats, LPROC_OFD_STATS_WRITE,
- LPROCFS_CNTR_AVGMINMAX, "write", "bytes");
+ LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_GETATTR,
+ 0, "getattr", "reqs");
lprocfs_counter_init(stats, LPROC_OFD_STATS_SETATTR,
0, "setattr", "reqs");
lprocfs_counter_init(stats, LPROC_OFD_STATS_PUNCH,
0, "punch", "reqs");
lprocfs_counter_init(stats, LPROC_OFD_STATS_SYNC,
0, "sync", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_DESTROY,
+ 0, "destroy", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_CREATE,
+ 0, "create", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_STATFS,
+ 0, "statfs", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_GET_INFO,
+ 0, "get_info", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_SET_INFO,
+ 0, "set_info", "reqs");
+ lprocfs_counter_init(stats, LPROC_OFD_STATS_QUOTACTL,
+ 0, "quotactl", "reqs");
}
+
#endif /* LPROCFS */
#include <lustre_param.h>
#include <lustre_fid.h>
#include <lustre_lfsck.h>
+#include <lustre/lustre_idl.h>
+#include <lustre_dlm.h>
+#include <lustre_quota.h>
#include "ofd_internal.h"
ENTRY;
lu_site_purge(env, top->ld_site, ~0);
-
/* process cleanup, pass mdt obd name to get obd umount flags */
lustre_cfg_bufs_reset(&bufs, obd->obd_name);
if (obd->obd_force)
lustre_cfg_free(lcfg);
lu_site_purge(env, top->ld_site, ~0);
+ if (!cfs_hash_is_empty(top->ld_site->ls_obj_hash)) {
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
+ lu_site_print(env, top->ld_site, &msgdata, lu_cdebug_printer);
+ }
LASSERT(m->ofd_osd_exp);
obd_disconnect(m->ofd_osd_exp);
rc = 0;
}
- target_recovery_init(&ofd->ofd_lut, ost_handle);
+ target_recovery_init(&ofd->ofd_lut, tgt_request_handle);
LASSERT(obd->obd_no_conn);
spin_lock(&obd->obd_dev_lock);
obd->obd_no_conn = 0;
RETURN(rc);
}
- rc = lprocfs_alloc_obd_stats(obd, LPROC_OFD_LAST);
+ rc = lprocfs_alloc_obd_stats(obd, LPROC_OFD_STATS_LAST);
if (rc) {
CERROR("%s: lprocfs_alloc_obd_stats failed: %d.\n",
obd->obd_name, rc);
GOTO(obd_cleanup, rc);
}
- /* Init OFD private stats here */
- lprocfs_counter_init(obd->obd_stats, LPROC_OFD_READ_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
- lprocfs_counter_init(obd->obd_stats, LPROC_OFD_WRITE_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-
obd->obd_uses_nid_stats = 1;
entry = lprocfs_register("exports", obd->obd_proc_entry, NULL, NULL);
GOTO(obd_cleanup, rc);
}
+ ofd_stats_counter_init(obd->obd_stats);
+
rc = lprocfs_job_stats_init(obd, LPROC_OFD_STATS_LAST,
ofd_stats_counter_init);
if (rc)
return rc;
}
+int ofd_set_info_hdl(struct tgt_session_info *tsi)
+{
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct ost_body *body = NULL, *repbody;
+ void *key, *val = NULL;
+ int keylen, vallen, rc = 0;
+ bool is_grant_shrink;
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+
+ ENTRY;
+
+ key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
+ if (key == NULL) {
+ DEBUG_REQ(D_HA, req, "no set_info key");
+ RETURN(err_serious(-EFAULT));
+ }
+ keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
+ RCL_CLIENT);
+
+ val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
+ if (val == NULL) {
+ DEBUG_REQ(D_HA, req, "no set_info val");
+ RETURN(err_serious(-EFAULT));
+ }
+ vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
+ RCL_CLIENT);
+
+ is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
+ if (is_grant_shrink)
+ /* In this case the value is actually an RMF_OST_BODY, so we
+ * transmutate the type of this PTLRPC */
+ req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
+
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc < 0)
+ RETURN(rc);
+
+ if (is_grant_shrink) {
+ body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ *repbody = *body;
+
+ /** handle grant shrink, similar to a read request */
+ ofd_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
+ &repbody->oa);
+ } else if (KEY_IS(KEY_EVICT_BY_NID)) {
+ if (vallen > 0)
+ obd_export_evict_by_nid(tsi->tsi_exp->exp_obd, val);
+ rc = 0;
+ } else if (KEY_IS(KEY_CAPA_KEY)) {
+ rc = ofd_update_capa_key(ofd, val);
+ } else if (KEY_IS(KEY_SPTLRPC_CONF)) {
+ rc = tgt_adapt_sptlrpc_conf(tsi->tsi_tgt, 0);
+ } else {
+ CERROR("%s: Unsupported key %s\n",
+ tgt_name(tsi->tsi_tgt), (char *)key);
+ rc = -EOPNOTSUPP;
+ }
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SET_INFO,
+ tsi->tsi_jobid, 1);
+
+ RETURN(rc);
+}
+
+static int ofd_fiemap_get(const struct lu_env *env, struct ofd_device *ofd,
+ struct lu_fid *fid, struct ll_user_fiemap *fiemap)
+{
+ struct ofd_object *fo;
+ int rc;
+
+ fo = ofd_object_find(env, ofd, fid);
+ if (IS_ERR(fo)) {
+ CERROR("%s: error finding object "DFID"\n",
+ ofd_name(ofd), PFID(fid));
+ return PTR_ERR(fo);
+ }
+
+ ofd_read_lock(env, fo);
+ if (ofd_object_exists(fo))
+ rc = dt_fiemap_get(env, ofd_object_child(fo), fiemap);
+ else
+ rc = -ENOENT;
+ ofd_read_unlock(env, fo);
+ ofd_object_put(env, fo);
+ return rc;
+}
+
+struct locked_region {
+ cfs_list_t list;
+ struct lustre_handle lh;
+};
+
+static int lock_region(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ unsigned long long begin, unsigned long long end,
+ cfs_list_t *locked)
+{
+ struct locked_region *region = NULL;
+ __u64 flags = 0;
+ int rc;
+
+ LASSERT(begin <= end);
+ OBD_ALLOC_PTR(region);
+ if (region == NULL)
+ return -ENOMEM;
+
+ rc = tgt_extent_lock(ns, res_id, begin, end, ®ion->lh,
+ LCK_PR, &flags);
+ if (rc != 0)
+ return rc;
+
+ CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n", begin, end,
+ ®ion->lh);
+ cfs_list_add(®ion->list, locked);
+
+ return 0;
+}
+
+static int lock_zero_regions(struct ldlm_namespace *ns,
+ struct ldlm_res_id *res_id,
+ struct ll_user_fiemap *fiemap,
+ cfs_list_t *locked)
+{
+ __u64 begin = fiemap->fm_start;
+ unsigned int i;
+ int rc = 0;
+ struct ll_fiemap_extent *fiemap_start = fiemap->fm_extents;
+
+ ENTRY;
+
+ CDEBUG(D_OTHER, "extents count %u\n", fiemap->fm_mapped_extents);
+ for (i = 0; i < fiemap->fm_mapped_extents; i++) {
+ if (fiemap_start[i].fe_logical > begin) {
+ CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
+ begin, fiemap_start[i].fe_logical);
+ rc = lock_region(ns, res_id, begin,
+ fiemap_start[i].fe_logical, locked);
+ if (rc)
+ RETURN(rc);
+ }
+
+ begin = fiemap_start[i].fe_logical + fiemap_start[i].fe_length;
+ }
+
+ if (begin < (fiemap->fm_start + fiemap->fm_length)) {
+ CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
+ begin, fiemap->fm_start + fiemap->fm_length);
+ rc = lock_region(ns, res_id, begin,
+ fiemap->fm_start + fiemap->fm_length, locked);
+ }
+
+ RETURN(rc);
+}
+
+static void unlock_zero_regions(struct ldlm_namespace *ns, cfs_list_t *locked)
+{
+ struct locked_region *entry, *temp;
+
+ cfs_list_for_each_entry_safe(entry, temp, locked, list) {
+ CDEBUG(D_OTHER, "ost unlock lh=%p\n", &entry->lh);
+ tgt_extent_unlock(&entry->lh, LCK_PR);
+ cfs_list_del(&entry->list);
+ OBD_FREE_PTR(entry);
+ }
+}
+
+int ofd_get_info_hdl(struct tgt_session_info *tsi)
+{
+ struct obd_export *exp = tsi->tsi_exp;
+ struct ofd_device *ofd = ofd_exp(exp);
+ struct ofd_thread_info *fti = tsi2ofd_info(tsi);
+ void *key;
+ int keylen;
+ int replylen, rc = 0;
+
+ ENTRY;
+
+ /* this common part for get_info rpc */
+ key = req_capsule_client_get(tsi->tsi_pill, &RMF_GETINFO_KEY);
+ if (key == NULL) {
+ DEBUG_REQ(D_HA, tgt_ses_req(tsi), "no get_info key");
+ RETURN(err_serious(-EPROTO));
+ }
+ keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_GETINFO_KEY,
+ RCL_CLIENT);
+
+ if (KEY_IS(KEY_LAST_ID)) {
+ obd_id *last_id;
+ struct ofd_seq *oseq;
+
+ req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_ID);
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc)
+ RETURN(err_serious(rc));
+
+ last_id = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_ID);
+
+ oseq = ofd_seq_load(tsi->tsi_env, ofd,
+ (obd_seq)exp->exp_filter_data.fed_group);
+ if (IS_ERR(oseq))
+ rc = -EFAULT;
+ else
+ *last_id = ofd_seq_last_oid(oseq);
+ ofd_seq_put(tsi->tsi_env, oseq);
+ } else if (KEY_IS(KEY_FIEMAP)) {
+ struct ll_fiemap_info_key *fm_key;
+ struct ll_user_fiemap *fiemap;
+ struct lu_fid *fid = &fti->fti_fid;
+
+ req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_FIEMAP);
+
+ fm_key = req_capsule_client_get(tsi->tsi_pill, &RMF_FIEMAP_KEY);
+ rc = tgt_validate_obdo(tsi, &fm_key->oa);
+ if (rc)
+ RETURN(err_serious(rc));
+
+ replylen = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
+ req_capsule_set_size(tsi->tsi_pill, &RMF_FIEMAP_VAL,
+ RCL_SERVER, replylen);
+
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc)
+ RETURN(err_serious(rc));
+
+ fiemap = req_capsule_server_get(tsi->tsi_pill, &RMF_FIEMAP_VAL);
+ if (fiemap == NULL)
+ RETURN(-ENOMEM);
+
+ rc = ostid_to_fid(fid, &fm_key->oa.o_oi, 0);
+ if (rc != 0)
+ RETURN(rc);
+
+ CDEBUG(D_INODE, "get FIEMAP of object "DFID"\n", PFID(fid));
+
+ *fiemap = fm_key->fiemap;
+ rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid, fiemap);
+
+ /* LU-3219: Lock the sparse areas to make sure dirty
+ * flushed back from client, then call fiemap again. */
+ if (fm_key->oa.o_valid & OBD_MD_FLFLAGS &&
+ fm_key->oa.o_flags & OBD_FL_SRVLOCK) {
+ cfs_list_t locked = CFS_LIST_HEAD_INIT(locked);
+
+ ost_fid_build_resid(fid, &fti->fti_resid);
+ rc = lock_zero_regions(ofd->ofd_namespace,
+ &fti->fti_resid, fiemap,
+ &locked);
+ if (rc == 0 && !cfs_list_empty(&locked)) {
+ rc = ofd_fiemap_get(tsi->tsi_env, ofd, fid,
+ fiemap);
+ unlock_zero_regions(ofd->ofd_namespace,
+ &locked);
+ }
+ }
+ } else if (KEY_IS(KEY_LAST_FID)) {
+ struct ofd_device *ofd = ofd_exp(exp);
+ struct ofd_seq *oseq;
+ struct lu_fid *fid;
+ int rc;
+
+ req_capsule_extend(tsi->tsi_pill, &RQF_OST_GET_INFO_LAST_FID);
+ rc = req_capsule_server_pack(tsi->tsi_pill);
+ if (rc)
+ RETURN(err_serious(rc));
+
+ fid = req_capsule_client_get(tsi->tsi_pill, &RMF_FID);
+ if (fid == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ fid_le_to_cpu(&fti->fti_ostid.oi_fid, fid);
+
+ fid = req_capsule_server_get(tsi->tsi_pill, &RMF_FID);
+ if (fid == NULL)
+ RETURN(-ENOMEM);
+
+ oseq = ofd_seq_load(tsi->tsi_env, ofd,
+ ostid_seq(&fti->fti_ostid));
+ if (IS_ERR(oseq))
+ RETURN(PTR_ERR(oseq));
+
+ rc = ostid_to_fid(fid, &oseq->os_oi,
+ ofd->ofd_lut.lut_lsd.lsd_osd_index);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ CDEBUG(D_HA, "%s: LAST FID is "DFID"\n", ofd_name(ofd),
+ PFID(fid));
+out_put:
+ ofd_seq_put(tsi->tsi_env, oseq);
+ } else {
+ CERROR("%s: not supported key %s\n", tgt_name(tsi->tsi_tgt),
+ (char *)key);
+ rc = -EOPNOTSUPP;
+ }
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GET_INFO,
+ tsi->tsi_jobid, 1);
+
+ RETURN(rc);
+}
+
+static int ofd_getattr_hdl(struct tgt_session_info *tsi)
+{
+ struct ofd_thread_info *fti = tsi2ofd_info(tsi);
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+ struct ost_body *repbody;
+ struct lustre_handle lh = { 0 };
+ struct ofd_object *fo;
+ __u64 flags = 0;
+ ldlm_mode_t lock_mode = LCK_PR;
+ bool srvlock;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(tsi->tsi_ost_body != NULL);
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ RETURN(-ENOMEM);
+
+ repbody->oa.o_oi = tsi->tsi_ost_body->oa.o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ srvlock = tsi->tsi_ost_body->oa.o_valid & OBD_MD_FLFLAGS &&
+ tsi->tsi_ost_body->oa.o_flags & OBD_FL_SRVLOCK;
+
+ if (srvlock) {
+ if (unlikely(tsi->tsi_ost_body->oa.o_flags & OBD_FL_FLUSH))
+ lock_mode = LCK_PW;
+
+ rc = tgt_extent_lock(tsi->tsi_tgt->lut_obd->obd_namespace,
+ &tsi->tsi_resid, 0, OBD_OBJECT_EOF, &lh,
+ lock_mode, &flags);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+ if (IS_ERR(fo))
+ GOTO(out, rc = PTR_ERR(fo));
+
+ rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
+ if (rc == 0) {
+ __u64 curr_version;
+
+ obdo_from_la(&repbody->oa, &fti->fti_attr,
+ OFD_VALID_FLAGS | LA_UID | LA_GID);
+ tgt_drop_id(tsi->tsi_exp, &repbody->oa);
+
+ /* Store object version in reply */
+ curr_version = dt_version_get(tsi->tsi_env,
+ ofd_object_child(fo));
+ if ((__s64)curr_version != -EOPNOTSUPP) {
+ repbody->oa.o_valid |= OBD_MD_FLDATAVERSION;
+ repbody->oa.o_data_version = curr_version;
+ }
+ }
+
+ ofd_object_put(tsi->tsi_env, fo);
+out:
+ if (srvlock)
+ tgt_extent_unlock(&lh, lock_mode);
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_GETATTR,
+ tsi->tsi_jobid, 1);
+
+ repbody->oa.o_valid |= OBD_MD_FLFLAGS;
+ repbody->oa.o_flags = OBD_FL_FLUSH;
+
+ RETURN(rc);
+}
+
+static int ofd_setattr_hdl(struct tgt_session_info *tsi)
+{
+ struct ofd_thread_info *fti = tsi2ofd_info(tsi);
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+ struct ost_body *body = tsi->tsi_ost_body;
+ struct ost_body *repbody;
+ struct ldlm_resource *res;
+ struct ofd_object *fo;
+ struct filter_fid *ff = NULL;
+ int rc = 0;
+
+ ENTRY;
+
+ LASSERT(body != NULL);
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ RETURN(-ENOMEM);
+
+ repbody->oa.o_oi = body->oa.o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ /* This would be very bad - accidentally truncating a file when
+ * changing the time or similar - bug 12203. */
+ if (body->oa.o_valid & OBD_MD_FLSIZE &&
+ body->oa.o_size != OBD_OBJECT_EOF) {
+ static char mdsinum[48];
+
+ if (body->oa.o_valid & OBD_MD_FLFID)
+ snprintf(mdsinum, sizeof(mdsinum) - 1,
+ "of parent "DFID, body->oa.o_parent_seq,
+ body->oa.o_parent_oid, 0);
+ else
+ mdsinum[0] = '\0';
+
+ CERROR("%s: setattr from %s is trying to truncate object "DFID
+ " %s\n", ofd_name(ofd), obd_export_nid2str(tsi->tsi_exp),
+ PFID(&tsi->tsi_fid), mdsinum);
+ RETURN(-EPERM);
+ }
+
+ fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+ if (IS_ERR(fo))
+ GOTO(out, rc = PTR_ERR(fo));
+
+ la_from_obdo(&fti->fti_attr, &body->oa, body->oa.o_valid);
+ fti->fti_attr.la_valid &= ~LA_TYPE;
+
+ if (body->oa.o_valid & OBD_MD_FLFID) {
+ ff = &fti->fti_mds_fid;
+ ofd_prepare_fidea(ff, &body->oa);
+ }
+
+ /* setting objects attributes (including owner/group) */
+ rc = ofd_attr_set(tsi->tsi_env, fo, &fti->fti_attr, ff);
+ if (rc != 0)
+ GOTO(out_put, rc);
+
+ obdo_from_la(&repbody->oa, &fti->fti_attr,
+ OFD_VALID_FLAGS | LA_UID | LA_GID);
+ tgt_drop_id(tsi->tsi_exp, &repbody->oa);
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SETATTR,
+ tsi->tsi_jobid, 1);
+ EXIT;
+out_put:
+ ofd_object_put(tsi->tsi_env, fo);
+out:
+ if (rc == 0) {
+ /* we do not call this before to avoid lu_object_find() in
+ * ->lvbo_update() holding another reference on the object.
+ * otherwise concurrent destroy can make the object unavailable
+ * for 2nd lu_object_find() waiting for the first reference
+ * to go... deadlock! */
+ res = ldlm_resource_get(ofd->ofd_namespace, NULL,
+ &tsi->tsi_resid, LDLM_EXTENT, 0);
+ if (res != NULL) {
+ ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_resource_putref(res);
+ }
+ }
+ return rc;
+}
+
+static int ofd_create_hdl(struct tgt_session_info *tsi)
+{
+ struct ost_body *repbody;
+ const struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct obdo *rep_oa;
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+ obd_seq seq = ostid_seq(&oa->o_oi);
+ obd_id oid = ostid_id(&oa->o_oi);
+ struct ofd_seq *oseq;
+ int rc = 0, diff;
+ int sync_trans = 0;
+
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+ RETURN(-EROFS);
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ RETURN(-ENOMEM);
+
+ rep_oa = &repbody->oa;
+ rep_oa->o_oi = oa->o_oi;
+
+ LASSERT(seq >= FID_SEQ_OST_MDT0);
+ LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+
+ CDEBUG(D_INFO, "ofd_create("DOSTID")\n", POSTID(&oa->o_oi));
+
+ oseq = ofd_seq_load(tsi->tsi_env, ofd, seq);
+ if (IS_ERR(oseq)) {
+ CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
+ ofd_name(ofd), seq, PTR_ERR(oseq));
+ RETURN(-EINVAL);
+ }
+
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_RECREATE_OBJS)) {
+ if (!ofd_obd(ofd)->obd_recovering ||
+ oid > ofd_seq_last_oid(oseq)) {
+ CERROR("%s: recreate objid "DOSTID" > last id "LPU64
+ "\n", ofd_name(ofd), POSTID(&oa->o_oi),
+ ofd_seq_last_oid(oseq));
+ GOTO(out_nolock, rc = -EINVAL);
+ }
+ /* Do nothing here, we re-create objects during recovery
+ * upon write replay, see ofd_preprw_write() */
+ GOTO(out_nolock, rc = 0);
+ }
+ /* former ofd_handle_precreate */
+ if ((oa->o_valid & OBD_MD_FLFLAGS) &&
+ (oa->o_flags & OBD_FL_DELORPHAN)) {
+ /* destroy orphans */
+ if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
+ tsi->tsi_exp->exp_conn_cnt) {
+ CERROR("%s: dropping old orphan cleanup request\n",
+ ofd_name(ofd));
+ GOTO(out_nolock, rc = 0);
+ }
+ /* This causes inflight precreates to abort and drop lock */
+ oseq->os_destroys_in_progress = 1;
+ mutex_lock(&oseq->os_create_lock);
+ if (!oseq->os_destroys_in_progress) {
+ CERROR("%s:["LPU64"] destroys_in_progress already"
+ " cleared\n", ofd_name(ofd), seq);
+ ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
+ GOTO(out, rc = 0);
+ }
+ diff = oid - ofd_seq_last_oid(oseq);
+ CDEBUG(D_HA, "ofd_last_id() = "LPU64" -> diff = %d\n",
+ ofd_seq_last_oid(oseq), diff);
+ if (-diff > OST_MAX_PRECREATE) {
+ /* FIXME: should reset precreate_next_id on MDS */
+ rc = 0;
+ } else if (diff < 0) {
+ rc = ofd_orphans_destroy(tsi->tsi_env, tsi->tsi_exp,
+ ofd, rep_oa);
+ oseq->os_destroys_in_progress = 0;
+ } else {
+ /* XXX: Used by MDS for the first time! */
+ oseq->os_destroys_in_progress = 0;
+ }
+ } else {
+ mutex_lock(&oseq->os_create_lock);
+ if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
+ tsi->tsi_exp->exp_conn_cnt) {
+ CERROR("%s: dropping old precreate request\n",
+ ofd_name(ofd));
+ GOTO(out, rc = 0);
+ }
+ /* only precreate if seq is 0, IDIF or normal and also o_id
+ * must be specfied */
+ if ((!fid_seq_is_mdt(seq) && !fid_seq_is_norm(seq) &&
+ !fid_seq_is_idif(seq)) || oid == 0) {
+ diff = 1; /* shouldn't we create this right now? */
+ } else {
+ diff = oid - ofd_seq_last_oid(oseq);
+ /* Do sync create if the seq is about to used up */
+ if (fid_seq_is_idif(seq) || fid_seq_is_mdt0(seq)) {
+ if (unlikely(oid >= IDIF_MAX_OID - 1))
+ sync_trans = 1;
+ } else if (fid_seq_is_norm(seq)) {
+ if (unlikely(oid >=
+ LUSTRE_DATA_SEQ_MAX_WIDTH - 1))
+ sync_trans = 1;
+ } else {
+ CERROR("%s : invalid o_seq "DOSTID"\n",
+ ofd_name(ofd), POSTID(&oa->o_oi));
+ GOTO(out, rc = -EINVAL);
+ }
+ }
+ }
+ if (diff > 0) {
+ cfs_time_t enough_time = cfs_time_shift(DISK_TIMEOUT);
+ obd_id next_id;
+ int created = 0;
+ int count;
+
+ if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+ !(oa->o_flags & OBD_FL_DELORPHAN)) {
+ /* don't enforce grant during orphan recovery */
+ rc = ofd_grant_create(tsi->tsi_env,
+ ofd_obd(ofd)->obd_self_export,
+ &diff);
+ if (rc) {
+ CDEBUG(D_HA, "%s: failed to acquire grant "
+ "space for precreate (%d): rc = %d\n",
+ ofd_name(ofd), diff, rc);
+ diff = 0;
+ }
+ }
+
+ /* This can happen if a new OST is formatted and installed
+ * in place of an old one at the same index. Instead of
+ * precreating potentially millions of deleted old objects
+ * (possibly filling the OST), only precreate the last batch.
+ * LFSCK will eventually clean up any orphans. LU-14 */
+ if (diff > 5 * OST_MAX_PRECREATE) {
+ diff = OST_MAX_PRECREATE / 2;
+ LCONSOLE_WARN("%s: precreate FID "DOSTID" is over %u "
+ "larger than the LAST_ID "DOSTID", only "
+ "precreating the last %u objects.\n",
+ ofd_name(ofd), POSTID(&oa->o_oi),
+ 5 * OST_MAX_PRECREATE,
+ POSTID(&oseq->os_oi), diff);
+ ofd_seq_last_oid_set(oseq, ostid_id(&oa->o_oi) - diff);
+ }
+
+ while (diff > 0) {
+ next_id = ofd_seq_last_oid(oseq) + 1;
+ count = ofd_precreate_batch(ofd, diff);
+
+ CDEBUG(D_HA, "%s: reserve %d objects in group "LPX64
+ " at "LPU64"\n", ofd_name(ofd),
+ count, seq, next_id);
+
+ if (cfs_time_after(jiffies, enough_time)) {
+ LCONSOLE_WARN("%s: Slow creates, %d/%d objects"
+ " created at a rate of %d/s\n",
+ ofd_name(ofd), created,
+ diff + created,
+ created / DISK_TIMEOUT);
+ break;
+ }
+
+ rc = ofd_precreate_objects(tsi->tsi_env, ofd, next_id,
+ oseq, count, sync_trans);
+ if (rc > 0) {
+ created += rc;
+ diff -= rc;
+ } else if (rc < 0) {
+ break;
+ }
+ }
+ if (created > 0)
+ /* some objects got created, we can return
+ * them, even if last creation failed */
+ rc = 0;
+ else
+ CERROR("%s: unable to precreate: rc = %d\n",
+ ofd_name(ofd), rc);
+
+ if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
+ !(oa->o_flags & OBD_FL_DELORPHAN))
+ ofd_grant_commit(tsi->tsi_env,
+ ofd_obd(ofd)->obd_self_export, rc);
+
+ ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
+ }
+ EXIT;
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_CREATE,
+ tsi->tsi_jobid, 1);
+out:
+ mutex_unlock(&oseq->os_create_lock);
+out_nolock:
+ if (rc == 0)
+ rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ ofd_seq_put(tsi->tsi_env, oseq);
+ return rc;
+}
+
+static int ofd_destroy_hdl(struct tgt_session_info *tsi)
+{
+ const struct ost_body *body = tsi->tsi_ost_body;
+ struct ost_body *repbody;
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+ struct ofd_thread_info *fti = tsi2ofd_info(tsi);
+ obd_count count;
+ int rc = 0;
+
+ ENTRY;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
+ RETURN(-EROFS);
+
+ /* This is old case for clients before Lustre 2.4 */
+ /* If there's a DLM request, cancel the locks mentioned in it */
+ if (req_capsule_field_present(tsi->tsi_pill, &RMF_DLM_REQ,
+ RCL_CLIENT)) {
+ struct ldlm_request *dlm;
+
+ dlm = req_capsule_client_get(tsi->tsi_pill, &RMF_DLM_REQ);
+ if (dlm == NULL)
+ RETURN(-EFAULT);
+ ldlm_request_cancel(tgt_ses_req(tsi), dlm, 0);
+ }
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ repbody->oa.o_oi = body->oa.o_oi;
+
+ /* check that o_misc makes sense */
+ if (body->oa.o_valid & OBD_MD_FLOBJCOUNT)
+ count = body->oa.o_misc;
+ else
+ count = 1; /* default case - single destroy */
+
+ /**
+ * There can be sequence of objects to destroy. Therefore this request
+ * may have multiple transaction involved in. It is OK, we need only
+ * the highest used transno to be reported back in reply but not for
+ * replays, they must report their transno
+ */
+ if (fti->fti_transno == 0) /* not replay */
+ fti->fti_mult_trans = 1;
+
+ CDEBUG(D_HA, "%s: Destroy object "DOSTID" count %d\n", ofd_name(ofd),
+ POSTID(&body->oa.o_oi), count);
+ while (count > 0) {
+ int lrc;
+
+ lrc = ostid_to_fid(&fti->fti_fid, &repbody->oa.o_oi, 0);
+ if (lrc != 0) {
+ if (rc == 0)
+ rc = lrc;
+ GOTO(out, rc);
+ }
+ lrc = ofd_destroy_by_fid(tsi->tsi_env, ofd, &fti->fti_fid, 0);
+ if (lrc == -ENOENT) {
+ CDEBUG(D_INODE,
+ "%s: destroying non-existent object "DFID"\n",
+ ofd_name(ofd), PFID(&fti->fti_fid));
+ /* rewrite rc with -ENOENT only if it is 0 */
+ if (rc == 0)
+ rc = lrc;
+ } else if (lrc != 0) {
+ CERROR("%s: error destroying object "DFID": %d\n",
+ ofd_name(ofd), PFID(&fti->fti_fid),
+ rc);
+ rc = lrc;
+ }
+ count--;
+ ostid_inc_id(&repbody->oa.o_oi);
+ }
+
+ /* if we have transaction then there were some deletions, we don't
+ * need to return ENOENT in that case because it will not wait
+ * for commit of these deletions. The ENOENT must be returned only
+ * if there were no transations.
+ */
+ if (rc == -ENOENT) {
+ if (fti->fti_transno != 0)
+ rc = 0;
+ } else if (rc != 0) {
+ /*
+ * If we have at least one transaction then llog record
+ * on server will be removed upon commit, so for rc != 0
+ * we return no transno and llog record will be reprocessed.
+ */
+ fti->fti_transno = 0;
+ }
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_DESTROY,
+ tsi->tsi_jobid, 1);
+out:
+ RETURN(rc);
+}
+
+static int ofd_statfs_hdl(struct tgt_session_info *tsi)
+{
+ struct obd_statfs *osfs;
+ int rc;
+
+ ENTRY;
+
+ osfs = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_STATFS);
+
+ rc = ofd_statfs(tsi->tsi_env, tsi->tsi_exp, osfs,
+ cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS), 0);
+ if (rc != 0)
+ CERROR("%s: statfs failed: rc = %d\n",
+ tgt_name(tsi->tsi_tgt), rc);
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
+ rc = -EINPROGRESS;
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_STATFS,
+ tsi->tsi_jobid, 1);
+
+ RETURN(rc);
+}
+
+static int ofd_sync_hdl(struct tgt_session_info *tsi)
+{
+ struct ost_body *body = tsi->tsi_ost_body;
+ struct ost_body *repbody;
+ struct ofd_thread_info *fti = tsi2ofd_info(tsi);
+ struct ofd_device *ofd = ofd_exp(tsi->tsi_exp);
+ struct ofd_object *fo = NULL;
+ int rc = 0;
+
+ ENTRY;
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+
+ /* if no objid is specified, it means "sync whole filesystem" */
+ if (!fid_is_zero(&tsi->tsi_fid)) {
+ fo = ofd_object_find_exists(tsi->tsi_env, ofd, &tsi->tsi_fid);
+ if (IS_ERR(fo))
+ RETURN(PTR_ERR(fo));
+ }
+
+ rc = tgt_sync(tsi->tsi_env, tsi->tsi_tgt,
+ fo != NULL ? ofd_object_child(fo) : NULL);
+ if (rc)
+ GOTO(put, rc);
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_SYNC,
+ tsi->tsi_jobid, 1);
+ if (fo == NULL)
+ RETURN(0);
+
+ repbody->oa.o_oi = body->oa.o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
+
+ rc = ofd_attr_get(tsi->tsi_env, fo, &fti->fti_attr);
+ if (rc == 0)
+ obdo_from_la(&repbody->oa, &fti->fti_attr,
+ OFD_VALID_FLAGS);
+ else
+ /* don't return rc from getattr */
+ rc = 0;
+ EXIT;
+put:
+ if (fo != NULL)
+ ofd_object_put(tsi->tsi_env, fo);
+ return rc;
+}
+
+static int ofd_punch_hdl(struct tgt_session_info *tsi)
+{
+ const struct obdo *oa = &tsi->tsi_ost_body->oa;
+ struct ost_body *repbody;
+ struct ofd_thread_info *info = tsi2ofd_info(tsi);
+ struct ldlm_namespace *ns = tsi->tsi_tgt->lut_obd->obd_namespace;
+ struct ldlm_resource *res;
+ struct ofd_object *fo;
+ struct filter_fid *ff = NULL;
+ __u64 flags = 0;
+ struct lustre_handle lh = { 0, };
+ int rc;
+ __u64 start, end;
+ bool srvlock;
+
+ ENTRY;
+
+ /* check that we do support OBD_CONNECT_TRUNCLOCK. */
+ CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
+
+ if ((oa->o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
+ (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
+ RETURN(err_serious(-EPROTO));
+
+ repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ RETURN(err_serious(-ENOMEM));
+
+ /* punch start,end are passed in o_size,o_blocks throught wire */
+ start = oa->o_size;
+ end = oa->o_blocks;
+
+ if (end != OBD_OBJECT_EOF) /* Only truncate is supported */
+ RETURN(-EPROTO);
+
+ /* standard truncate optimization: if file body is completely
+ * destroyed, don't send data back to the server. */
+ if (start == 0)
+ flags |= LDLM_FL_AST_DISCARD_DATA;
+
+ repbody->oa.o_oi = oa->o_oi;
+ repbody->oa.o_valid = OBD_MD_FLID;
+
+ srvlock = oa->o_valid & OBD_MD_FLFLAGS &&
+ oa->o_flags & OBD_FL_SRVLOCK;
+
+ if (srvlock) {
+ rc = tgt_extent_lock(ns, &tsi->tsi_resid, start, end, &lh,
+ LCK_PW, &flags);
+ if (rc != 0)
+ RETURN(rc);
+ }
+
+ CDEBUG(D_INODE, "calling punch for object "DFID", valid = "LPX64
+ ", start = "LPD64", end = "LPD64"\n", PFID(&tsi->tsi_fid),
+ oa->o_valid, start, end);
+
+ fo = ofd_object_find_exists(tsi->tsi_env, ofd_exp(tsi->tsi_exp),
+ &tsi->tsi_fid);
+ if (IS_ERR(fo))
+ GOTO(out, rc = PTR_ERR(fo));
+
+ la_from_obdo(&info->fti_attr, oa,
+ OBD_MD_FLMTIME | OBD_MD_FLATIME | OBD_MD_FLCTIME);
+ info->fti_attr.la_size = start;
+ info->fti_attr.la_valid |= LA_SIZE;
+
+ if (oa->o_valid & OBD_MD_FLFID) {
+ ff = &info->fti_mds_fid;
+ ofd_prepare_fidea(ff, oa);
+ }
+
+ rc = ofd_object_punch(tsi->tsi_env, fo, start, end, &info->fti_attr,
+ ff);
+ if (rc)
+ GOTO(out_put, rc);
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_PUNCH,
+ tsi->tsi_jobid, 1);
+ EXIT;
+out_put:
+ ofd_object_put(tsi->tsi_env, fo);
+out:
+ if (srvlock)
+ tgt_extent_unlock(&lh, LCK_PW);
+ if (rc == 0) {
+ /* we do not call this before to avoid lu_object_find() in
+ * ->lvbo_update() holding another reference on the object.
+ * otherwise concurrent destroy can make the object unavailable
+ * for 2nd lu_object_find() waiting for the first reference
+ * to go... deadlock! */
+ res = ldlm_resource_get(ns, NULL, &tsi->tsi_resid,
+ LDLM_EXTENT, 0);
+ if (res != NULL) {
+ ldlm_res_lvbo_update(res, NULL, 0);
+ ldlm_resource_putref(res);
+ }
+ }
+ return rc;
+}
+
+
+static int ofd_quotactl(struct tgt_session_info *tsi)
+{
+ struct obd_quotactl *oqctl, *repoqc;
+ int rc;
+
+ ENTRY;
+
+ oqctl = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
+ if (oqctl == NULL)
+ RETURN(err_serious(-EPROTO));
+
+ repoqc = req_capsule_server_get(tsi->tsi_pill, &RMF_OBD_QUOTACTL);
+ if (repoqc == NULL)
+ RETURN(err_serious(-ENOMEM));
+
+ /* report success for quota on/off for interoperability with current MDT
+ * stack */
+ if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF)
+ RETURN(0);
+
+ *repoqc = *oqctl;
+ rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom, repoqc);
+
+ ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_QUOTACTL,
+ tsi->tsi_jobid, 1);
+
+ RETURN(rc);
+}
+
+#define OBD_FAIL_OST_READ_NET OBD_FAIL_OST_BRW_NET
+#define OBD_FAIL_OST_WRITE_NET OBD_FAIL_OST_BRW_NET
+#define OST_BRW_READ OST_READ
+#define OST_BRW_WRITE OST_WRITE
+
+static struct tgt_handler ofd_tgt_handlers[] = {
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+ 0, OST_CONNECT, tgt_connect,
+ &RQF_CONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+ 0, OST_DISCONNECT, tgt_disconnect,
+ &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
+TGT_RPC_HANDLER(OST_FIRST_OPC,
+ 0, OST_SET_INFO, ofd_set_info_hdl,
+ &RQF_OBD_SET_INFO, LUSTRE_OST_VERSION),
+TGT_OST_HDL(0, OST_GET_INFO, ofd_get_info_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_GETATTR, ofd_getattr_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+ OST_SETATTR, ofd_setattr_hdl),
+TGT_OST_HDL(0 | HABEO_REFERO | MUTABOR,
+ OST_CREATE, ofd_create_hdl),
+TGT_OST_HDL(0 | HABEO_REFERO | MUTABOR,
+ OST_DESTROY, ofd_destroy_hdl),
+TGT_OST_HDL(0 | HABEO_REFERO, OST_STATFS, ofd_statfs_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_BRW_READ, tgt_brw_read),
+/* don't set CORPUS flag for brw_write because -ENOENT may be valid case */
+TGT_OST_HDL(MUTABOR, OST_BRW_WRITE, tgt_brw_write),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR,
+ OST_PUNCH, ofd_punch_hdl),
+TGT_OST_HDL(HABEO_CORPUS| HABEO_REFERO, OST_SYNC, ofd_sync_hdl),
+TGT_OST_HDL(0 | HABEO_REFERO, OST_QUOTACTL, ofd_quotactl),
+};
+
static struct tgt_opc_slice ofd_common_slice[] = {
{
- .tos_opc_start = UPDATE_OBJ,
- .tos_opc_end = UPDATE_LAST_OPC,
- .tos_hs = tgt_out_handlers
+ .tos_opc_start = OST_FIRST_OPC,
+ .tos_opc_end = OST_LAST_OPC,
+ .tos_hs = ofd_tgt_handlers
+ },
+ {
+ .tos_opc_start = OBD_FIRST_OPC,
+ .tos_opc_end = OBD_LAST_OPC,
+ .tos_hs = tgt_obd_handlers
+ },
+ {
+ .tos_opc_start = LDLM_FIRST_OPC,
+ .tos_opc_end = LDLM_LAST_OPC,
+ .tos_hs = tgt_dlm_handlers
+ },
+ {
+ .tos_opc_start = UPDATE_OBJ,
+ .tos_opc_end = UPDATE_LAST_OPC,
+ .tos_hs = tgt_out_handlers
},
{
.tos_opc_start = SEQ_FIRST_OPC,
#define OFD_FMD_MAX_NUM_DEFAULT 128
#define OFD_FMD_MAX_AGE_DEFAULT ((obd_timeout + 10) * HZ)
-enum {
- LPROC_OFD_READ_BYTES = 0,
- LPROC_OFD_WRITE_BYTES = 1,
- LPROC_OFD_LAST,
-};
-
-/* for job stats */
+/* request stats */
enum {
LPROC_OFD_STATS_READ = 0,
- LPROC_OFD_STATS_WRITE = 1,
- LPROC_OFD_STATS_SETATTR = 2,
- LPROC_OFD_STATS_PUNCH = 3,
- LPROC_OFD_STATS_SYNC = 4,
+ LPROC_OFD_STATS_WRITE,
+ LPROC_OFD_STATS_GETATTR,
+ LPROC_OFD_STATS_SETATTR,
+ LPROC_OFD_STATS_PUNCH,
+ LPROC_OFD_STATS_SYNC,
+ LPROC_OFD_STATS_DESTROY,
+ LPROC_OFD_STATS_CREATE,
+ LPROC_OFD_STATS_STATFS,
+ LPROC_OFD_STATS_GET_INFO,
+ LPROC_OFD_STATS_SET_INFO,
+ LPROC_OFD_STATS_QUOTACTL,
LPROC_OFD_STATS_LAST,
};
static inline void ofd_counter_incr(struct obd_export *exp, int opcode,
char *jobid, long amount)
{
+ if (exp->exp_obd && exp->exp_obd->obd_stats)
+ lprocfs_counter_add(exp->exp_obd->obd_stats, opcode, amount);
+
if (exp->exp_obd && exp->exp_obd->u.obt.obt_jobstats.ojs_hash &&
(exp_connect_flags(exp) & OBD_CONNECT_JOBSTATS))
lprocfs_job_stats_log(exp->exp_obd, jobid, opcode, amount);
if (exp->exp_nid_stats != NULL &&
exp->exp_nid_stats->nid_stats != NULL) {
- if (opcode == LPROC_OFD_STATS_READ)
- lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
- LPROC_OFD_READ_BYTES, amount);
- else if (opcode == LPROC_OFD_STATS_WRITE)
- lprocfs_counter_add(exp->exp_nid_stats->nid_stats,
- LPROC_OFD_WRITE_BYTES, amount);
+ lprocfs_counter_add(exp->exp_nid_stats->nid_stats, opcode,
+ amount);
}
}
unsigned long ofd_raid_degraded:1,
/* sync journal on writes */
ofd_syncjournal:1,
- /* sync on lock cancel */
- ofd_sync_lock_cancel:2,
/* shall we grant space to clients not
* supporting OBD_CONNECT_GRANT_PARAM? */
ofd_grant_compat_disable:1;
int ofd_statfs_internal(const struct lu_env *env, struct ofd_device *ofd,
struct obd_statfs *osfs, __u64 max_age,
int *from_cache);
+int ofd_orphans_destroy(const struct lu_env *env, struct obd_export *exp,
+ struct ofd_device *ofd, struct obdo *oa);
+int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
+ const struct lu_fid *fid, int orphan);
+int ofd_statfs(const struct lu_env *env, struct obd_export *exp,
+ struct obd_statfs *osfs, __u64 max_age, __u32 flags);
/* ofd_fs.c */
obd_id ofd_seq_last_oid(struct ofd_seq *oseq);
int ofd_attr_handle_ugid(const struct lu_env *env, struct ofd_object *fo,
struct lu_attr *la, int is_setattr);
+static inline
+struct ofd_object *ofd_object_find_exists(const struct lu_env *env,
+ struct ofd_device *ofd,
+ struct lu_fid *fid)
+{
+ struct ofd_object *fo;
+
+ fo = ofd_object_find(env, ofd, fid);
+ if (!IS_ERR(fo) && !ofd_object_exists(fo)) {
+ ofd_object_put(env, fo);
+ fo = ERR_PTR(-ENOENT);
+ }
+ return fo;
+}
+
/* ofd_grants.c */
#define OFD_GRANT_RATIO_SHIFT 8
static inline __u64 ofd_grant_reserved(struct ofd_device *ofd, obd_size bavail)
void *req_cookie, ldlm_mode_t mode, __u64 flags,
void *data);
-static inline struct ofd_thread_info * ofd_info(const struct lu_env *env)
+static inline struct ofd_thread_info *ofd_info(const struct lu_env *env)
{
struct ofd_thread_info *info;
+ lu_env_refill((void *)env);
info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
LASSERT(info);
- LASSERT(info->fti_env);
- LASSERT(info->fti_env == env);
return info;
}
-static inline struct ofd_thread_info * ofd_info_init(const struct lu_env *env,
- struct obd_export *exp)
+static inline struct ofd_thread_info *ofd_info_init(const struct lu_env *env,
+ struct obd_export *exp)
{
struct ofd_thread_info *info;
- info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
- LASSERT(info);
+ info = ofd_info(env);
LASSERT(info->fti_exp == NULL);
LASSERT(info->fti_env == NULL);
LASSERT(info->fti_attr.la_valid == 0);
return info;
}
+static inline struct ofd_thread_info *tsi2ofd_info(struct tgt_session_info *tsi)
+{
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct ofd_thread_info *info;
+
+ info = ofd_info(tsi->tsi_env);
+ LASSERT(info->fti_exp == NULL);
+ LASSERT(info->fti_env == NULL);
+ LASSERT(info->fti_attr.la_valid == 0);
+
+ info->fti_env = tsi->tsi_env;
+ info->fti_exp = tsi->tsi_exp;
+ info->fti_has_trans = 0;
+
+ info->fti_xid = req->rq_xid;
+ /** VBR: take versions from request */
+ if (req->rq_reqmsg != NULL &&
+ lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+ __u64 *pre_version = lustre_msg_get_versions(req->rq_reqmsg);
+
+ info->fti_pre_version = pre_version ? pre_version[0] : 0;
+ info->fti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+ }
+ return info;
+}
+
static inline void ofd_oti2info(struct ofd_thread_info *info,
struct obd_trans_info *oti)
{
static inline void ofd_slc_set(struct ofd_device *ofd)
{
if (ofd->ofd_syncjournal == 1)
- ofd->ofd_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
- else if (ofd->ofd_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
- ofd->ofd_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
+ ofd->ofd_lut.lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+ else if (ofd->ofd_lut.lut_sync_lock_cancel == NEVER_SYNC_ON_CANCEL)
+ ofd->ofd_lut.lut_sync_lock_cancel = ALWAYS_SYNC_ON_CANCEL;
}
-static inline void ofd_prepare_fidea(struct filter_fid *ff, struct obdo *oa)
+static inline void ofd_prepare_fidea(struct filter_fid *ff,
+ const struct obdo *oa)
{
- if (!(oa->o_valid & OBD_MD_FLGROUP))
- ostid_set_seq_mdt0(&oa->o_oi);
/* packing fid and converting it to LE for storing into EA.
* Here ->o_stripe_idx should be filled by LOV and rest of
* fields - by client. */
struct ofd_device *ofd, struct lu_fid *fid,
struct lu_attr *la, int niocount,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb,
- struct obd_trans_info *oti)
+ struct niobuf_local *lnb, char *jobid)
{
struct ofd_object *fo;
int i, j, rc, tot_bytes = 0;
rc = dt_read_prep(env, ofd_object_child(fo), lnb, *nr_local);
if (unlikely(rc))
GOTO(buf_put, rc);
- lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
- LPROC_OFD_READ_BYTES, tot_bytes);
- ofd_counter_incr(exp, LPROC_OFD_STATS_READ,
- oti->oti_jobid, tot_bytes);
+
+ ofd_counter_incr(exp, LPROC_OFD_STATS_READ, jobid, tot_bytes);
RETURN(0);
buf_put:
struct lu_attr *la, struct obdo *oa,
int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
- struct niobuf_local *lnb,
- struct obd_trans_info *oti)
+ struct niobuf_local *lnb, char *jobid)
{
struct ofd_object *fo;
int i, j, k, rc = 0, tot_bytes = 0;
GOTO(out, rc = -ENOENT);
}
- /* Always sync if syncjournal parameter is set */
- oti->oti_sync_write = ofd->ofd_syncjournal;
-
/* Process incoming grant info, set OBD_BRW_GRANTED flag and grant some
* space back if possible */
ofd_grant_prepare_write(env, exp, oa, rnb, obj->ioo_bufcnt);
lnb[j+k].lnb_flags = rnb[i].rnb_flags;
if (!(rnb[i].rnb_flags & OBD_BRW_GRANTED))
lnb[j+k].lnb_rc = -ENOSPC;
- if (!(rnb[i].rnb_flags & OBD_BRW_ASYNC))
- oti->oti_sync_write = 1;
/* remote client can't break through quota */
if (exp_connect_rmtclient(exp))
lnb[j+k].lnb_flags &= ~OBD_BRW_NOQUOTA;
if (unlikely(rc != 0))
GOTO(err, rc);
- lprocfs_counter_add(ofd_obd(ofd)->obd_stats,
- LPROC_OFD_WRITE_BYTES, tot_bytes);
- ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE,
- oti->oti_jobid, tot_bytes);
+ ofd_counter_incr(exp, LPROC_OFD_STATS_WRITE, jobid, tot_bytes);
RETURN(0);
err:
dt_bufs_put(env, ofd_object_child(fo), lnb, *nr_local);
return rc;
}
-int ofd_preprw(const struct lu_env* env, int cmd, struct obd_export *exp,
+int ofd_preprw(const struct lu_env *env, int cmd, struct obd_export *exp,
struct obdo *oa, int objcount, struct obd_ioobj *obj,
struct niobuf_remote *rnb, int *nr_local,
struct niobuf_local *lnb, struct obd_trans_info *oti,
struct lustre_capa *capa)
{
+ struct tgt_session_info *tsi = tgt_ses_info(env);
struct ofd_device *ofd = ofd_exp(exp);
struct ofd_thread_info *info;
+ char *jobid;
int rc = 0;
if (*nr_local > PTLRPC_MAX_BRW_PAGES) {
RETURN(-EPROTO);
}
- rc = lu_env_refill((struct lu_env *)env);
- LASSERT(rc == 0);
- info = ofd_info_init(env, exp);
+ if (tgt_ses_req(tsi) == NULL) { /* echo client case */
+ LASSERT(oti != NULL);
+ lu_env_refill((struct lu_env *)env);
+ info = ofd_info_init(env, exp);
+ ofd_oti2info(info, oti);
+ jobid = oti->oti_jobid;
+ } else {
+ info = tsi2ofd_info(tsi);
+ jobid = tsi->tsi_jobid;
+ }
LASSERT(oa != NULL);
if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT)) {
struct ofd_seq *oseq;
+
oseq = ofd_seq_load(env, ofd, ostid_seq(&oa->o_oi));
if (IS_ERR(oseq)) {
CERROR("%s: Can not find seq for "DOSTID
la_from_obdo(&info->fti_attr, oa, OBD_MD_FLGETATTR);
rc = ofd_preprw_write(env, exp, ofd, &info->fti_fid,
&info->fti_attr, oa, objcount,
- obj, rnb, nr_local, lnb, oti);
+ obj, rnb, nr_local, lnb, jobid);
}
} else if (cmd == OBD_BRW_READ) {
rc = ofd_auth_capa(exp, &info->fti_fid, ostid_seq(&oa->o_oi),
ofd_grant_prepare_read(env, exp, oa);
rc = ofd_preprw_read(env, exp, ofd, &info->fti_fid,
&info->fti_attr, obj->ioo_bufcnt,
- rnb, nr_local, lnb, oti);
+ rnb, nr_local, lnb, jobid);
obdo_from_la(oa, &info->fti_attr, LA_ATIME);
}
} else {
ofd_commitrw_write(const struct lu_env *env, struct ofd_device *ofd,
struct lu_fid *fid, struct lu_attr *la,
struct filter_fid *ff, int objcount,
- int niocount, struct niobuf_local *lnb,
- struct obd_trans_info *oti, int old_rc)
+ int niocount, struct niobuf_local *lnb, int old_rc)
{
struct ofd_thread_info *info = ofd_info(env);
struct ofd_object *fo;
struct thandle *th;
int rc = 0;
int retries = 0;
+ int i;
ENTRY;
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
- th->th_sync |= oti->oti_sync_write;
+ th->th_sync |= ofd->ofd_syncjournal;
+ if (th->th_sync == 0) {
+ for (i = 0; i < niocount; i++) {
+ if (!(lnb[i].lnb_flags & OBD_BRW_ASYNC)) {
+ th->th_sync = 1;
+ break;
+ }
+ }
+ }
if (OBD_FAIL_CHECK(OBD_FAIL_OST_DQACQ_NET))
GOTO(out_stop, rc = -EINPROGRESS);
struct niobuf_local *lnb, struct obd_trans_info *oti,
int old_rc)
{
- struct ofd_thread_info *info;
+ struct ofd_thread_info *info = ofd_info(env);
struct ofd_mod_data *fmd;
__u64 valid;
struct ofd_device *ofd = ofd_exp(exp);
struct filter_fid *ff = NULL;
int rc = 0;
- info = ofd_info(env);
- ofd_oti2info(info, oti);
-
LASSERT(npages > 0);
rc = ostid_to_fid(&info->fti_fid, &oa->o_oi, 0);
rc = ofd_commitrw_write(env, ofd, &info->fti_fid,
&info->fti_attr, ff, objcount, npages,
- lnb, oti, old_rc);
+ lnb, old_rc);
if (rc == 0)
obdo_from_la(oa, &info->fti_attr,
OFD_VALID_FLAGS | LA_GID | LA_UID);
}
}
rc = ofd_commitrw_read(env, ofd, &info->fti_fid, objcount,
- npages, lnb);
+ npages, lnb);
if (old_rc)
rc = old_rc;
} else {
rc = -EPROTO;
}
- ofd_info2oti(info, oti);
+ if (oti != NULL)
+ ofd_info2oti(info, oti);
RETURN(rc);
}
stats = exp->exp_nid_stats;
LASSERT(stats != NULL);
- num_stats = NUM_OBD_STATS + LPROC_OFD_LAST;
+ num_stats = NUM_OBD_STATS + LPROC_OFD_STATS_LAST;
+
stats->nid_stats = lprocfs_alloc_stats(num_stats,
LPROCFS_STATS_FLAG_NOPERCPU);
if (stats->nid_stats == NULL)
return -ENOMEM;
- lprocfs_init_ops_stats(LPROC_OFD_LAST, stats->nid_stats);
- lprocfs_counter_init(stats->nid_stats, LPROC_OFD_READ_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "read_bytes", "bytes");
- lprocfs_counter_init(stats->nid_stats, LPROC_OFD_WRITE_BYTES,
- LPROCFS_CNTR_AVGMINMAX, "write_bytes", "bytes");
-
+ lprocfs_init_ops_stats(LPROC_OFD_STATS_LAST, stats->nid_stats);
+ ofd_stats_counter_init(stats->nid_stats);
rc = lprocfs_register_stats(stats->nid_proc, "stats",
stats->nid_stats);
if (rc)
ofd = ofd_dev(obd->obd_lu_dev);
- rc = lu_env_refill((struct lu_env *)env);
- if (rc != 0) {
- CERROR("Failure to refill session: '%d'\n", rc);
- RETURN(rc);
- }
-
- ofd_info_init(env, exp);
rc = ofd_parse_connect_data(env, exp, data, false);
if (rc == 0)
ofd_export_stats_init(ofd, exp, localdata);
exp = class_conn2export(&conn);
LASSERT(exp != NULL);
- rc = lu_env_refill((struct lu_env *)env);
- if (rc != 0) {
- CERROR("Failure to refill session: '%d'\n", rc);
- GOTO(out, rc);
- }
-
- ofd_info_init(env, exp);
-
rc = ofd_parse_connect_data(env, exp, data, true);
if (rc)
GOTO(out, rc);
ofd_grant_discard(exp);
- rc = lu_env_init(&env, LCT_DT_THREAD);
- if (rc)
- RETURN(rc);
-
/* Do not erase record for recoverable client. */
if (exp->exp_obd->obd_replayable &&
- (!exp->exp_obd->obd_fail || exp->exp_failed))
- tgt_client_del(&env, exp);
- lu_env_fini(&env);
+ (!exp->exp_obd->obd_fail || exp->exp_failed)) {
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (rc)
+ GOTO(out, rc);
+ tgt_client_del(&env, exp);
+ lu_env_fini(&env);
+ }
+out:
class_export_put(exp);
RETURN(rc);
}
ofd_read_unlock(env, fo);
ofd_object_put(env, fo);
}
- } else if (KEY_IS(KEY_SYNC_LOCK_CANCEL)) {
- *((__u32 *) val) = ofd->ofd_sync_lock_cancel;
- *vallen = sizeof(__u32);
} else if (KEY_IS(KEY_LAST_FID)) {
struct ofd_device *ofd = ofd_exp(exp);
struct ofd_seq *oseq;
return 0;
}
-static int ofd_statfs(const struct lu_env *env, struct obd_export *exp,
- struct obd_statfs *osfs, __u64 max_age, __u32 flags)
+int ofd_statfs(const struct lu_env *env, struct obd_export *exp,
+ struct obd_statfs *osfs, __u64 max_age, __u32 flags)
{
struct obd_device *obd = class_exp2obd(exp);
struct ofd_device *ofd = ofd_dev(exp->exp_obd->obd_lu_dev);
return rc;
}
-static int ofd_destroy_by_fid(const struct lu_env *env,
- struct ofd_device *ofd,
- const struct lu_fid *fid, int orphan)
+int ofd_destroy_by_fid(const struct lu_env *env, struct ofd_device *ofd,
+ const struct lu_fid *fid, int orphan)
{
struct ofd_thread_info *info = ofd_info(env);
struct lustre_handle lockh;
RETURN(rc);
}
-static int ofd_orphans_destroy(const struct lu_env *env,
- struct obd_export *exp, struct ofd_device *ofd,
- struct obdo *oa)
+int ofd_orphans_destroy(const struct lu_env *env, struct obd_export *exp,
+ struct ofd_device *ofd, struct obdo *oa)
{
struct ofd_thread_info *info = ofd_info(env);
obd_id last;
struct ofd_thread_info *info = ofd_info(env);
int rc;
- if (info->fti_exp == NULL)
+ if (env->le_ses == NULL || info->fti_exp == NULL)
return 0;
/* declare last_rcvd update */
dt_trans_stop(env, ofd->ofd_osd, th);
}
-/*
- * last_rcvd & last_committed update callbacks
- */
-static int ofd_last_rcvd_update(struct ofd_thread_info *info,
- struct thandle *th)
-{
- struct ofd_device *ofd = ofd_exp(info->fti_exp);
- struct filter_export_data *fed;
- struct lsd_client_data *lcd;
- __s32 rc = th->th_result;
- __u64 *transno_p;
- loff_t off;
- int err;
- bool lw_client = false;
-
- ENTRY;
-
- LASSERT(ofd);
- LASSERT(info->fti_exp);
-
- if (exp_connect_flags(info->fti_exp) & OBD_CONNECT_LIGHTWEIGHT)
- lw_client = true;
-
- fed = &info->fti_exp->exp_filter_data;
- LASSERT(fed);
- lcd = fed->fed_ted.ted_lcd;
- /* if the export has already been disconnected, we have no last_rcvd
- * slot, update server data with latest transno then */
- if (lcd == NULL) {
- CWARN("commit transaction for disconnected client %s: rc %d\n",
- info->fti_exp->exp_client_uuid.uuid, rc);
- err = tgt_server_data_write(info->fti_env, &ofd->ofd_lut, th);
- RETURN(err);
- }
- /* ofd connect may cause transaction before export has last_rcvd
- * slot */
- if (fed->fed_ted.ted_lr_idx < 0 && !lw_client)
- RETURN(0);
- off = fed->fed_ted.ted_lr_off;
-
- transno_p = &lcd->lcd_last_transno;
- lcd->lcd_last_xid = info->fti_xid;
-
- /*
- * When we store zero transno in mcd we can lost last transno value
- * because mcd contains 0, but msd is not yet written
- * The server data should be updated also if the latest
- * transno is rewritten by zero. See the bug 11125 for details.
- */
- if (info->fti_transno == 0 &&
- *transno_p == ofd->ofd_lut.lut_last_transno) {
- spin_lock(&ofd->ofd_lut.lut_translock);
- ofd->ofd_lut.lut_lsd.lsd_last_transno =
- ofd->ofd_lut.lut_last_transno;
- spin_unlock(&ofd->ofd_lut.lut_translock);
- tgt_server_data_write(info->fti_env, &ofd->ofd_lut, th);
- }
-
- *transno_p = info->fti_transno;
- if (lw_client) {
- /* Although lightweight (LW) connections have no slot in
- * last_rcvd, we still want to maintain the in-memory
- * lsd_client_data structure in order to properly handle reply
- * reconstruction. */
- struct lu_target *tg =&ofd->ofd_lut;
- bool update = false;
-
- err = 0;
- /* All operations performed by LW clients are synchronous and
- * we store the committed transno in the last_rcvd header */
- spin_lock(&tg->lut_translock);
- if (info->fti_transno > tg->lut_lsd.lsd_last_transno) {
- tg->lut_lsd.lsd_last_transno = info->fti_transno;
- update = true;
- }
- spin_unlock(&tg->lut_translock);
- if (update)
- err = tgt_server_data_write(info->fti_env, tg, th);
- } else {
- LASSERT(fed->fed_ted.ted_lr_off > 0);
- err = tgt_client_data_write(info->fti_env, &ofd->ofd_lut, lcd,
- &off, th);
- }
-
- RETURN(err);
-}
-
/* Update last_rcvd records with the latest transaction data */
int ofd_txn_stop_cb(const struct lu_env *env, struct thandle *txn,
void *cookie)
{
- struct ofd_device *ofd = cookie;
- struct ofd_thread_info *info;
+ struct ofd_device *ofd = cookie;
+ struct ofd_thread_info *info = ofd_info(env);
+ struct dt_object *obj;
+ struct tgt_session_info *tsi;
+ bool echo_client;
+ int rc;
ENTRY;
- info = lu_context_key_get(&env->le_ctx, &ofd_thread_key);
+ if (env->le_ses == NULL || info->fti_exp == NULL)
+ RETURN(0);
+
+ tsi = tgt_ses_info(env);
- if (info->fti_exp == NULL)
- RETURN(0);
+ echo_client = (tgt_ses_req(tsi) == NULL);
- LASSERT(ofd_exp(info->fti_exp) == ofd);
- if (info->fti_has_trans) {
+ if (info->fti_has_trans && !echo_client) {
if (info->fti_mult_trans == 0) {
CERROR("More than one transaction "LPU64"\n",
info->fti_transno);
info->fti_has_trans = 1;
}
- spin_lock(&ofd->ofd_lut.lut_translock);
- if (txn->th_result != 0) {
- if (info->fti_transno != 0) {
- CERROR("Replay transno "LPU64" failed: rc %d\n",
- info->fti_transno, txn->th_result);
- info->fti_transno = 0;
- }
- } else if (info->fti_transno == 0) {
- info->fti_transno = ++ofd->ofd_lut.lut_last_transno;
- } else {
- /* should be replay */
- if (info->fti_transno > ofd->ofd_lut.lut_last_transno)
- ofd->ofd_lut.lut_last_transno = info->fti_transno;
- }
- spin_unlock(&ofd->ofd_lut.lut_translock);
-
/** VBR: set new versions */
- if (txn->th_result == 0 && info->fti_obj != NULL) {
- dt_version_set(env, ofd_object_child(info->fti_obj),
- info->fti_transno, txn);
- info->fti_obj = NULL;
- }
-
- /* filling reply data */
- CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
- info->fti_transno, ofd_obd(ofd)->obd_last_committed);
-
- /* if can't add callback, do sync write */
- txn->th_sync |= !!tgt_last_commit_cb_add(txn, &ofd->ofd_lut,
- info->fti_exp,
- info->fti_transno);
-
- return ofd_last_rcvd_update(info, txn);
+ if (info->fti_obj != NULL)
+ obj = ofd_object_child(info->fti_obj);
+ else
+ obj = NULL;
+
+ if (unlikely(echo_client)) /* echo client special case */
+ rc = tgt_last_rcvd_update_echo(env, &ofd->ofd_lut, obj, txn,
+ tsi->tsi_exp);
+ else
+ rc = tgt_last_rcvd_update(env, &ofd->ofd_lut, obj, 0, txn,
+ tgt_ses_req(tsi));
+ RETURN(rc);
}
if (req == NULL)
RETURN(-ENOMEM);
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY, RCL_CLIENT,
+ req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY, RCL_CLIENT,
sizeof(KEY_LAST_FID));
- req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL, RCL_CLIENT,
- sizeof(struct lu_fid));
-
rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
if (rc) {
ptlrpc_request_free(req);
RETURN(rc);
}
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
+ tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
memcpy(tmp, KEY_LAST_FID, sizeof(KEY_LAST_FID));
req->rq_no_delay = req->rq_no_resend = 1;
- tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_VAL);
- fid_cpu_to_le((struct lu_fid *)tmp, &d->opd_last_used_fid);
+ last_fid = req_capsule_client_get(&req->rq_pill, &RMF_FID);
+ fid_cpu_to_le(last_fid, &d->opd_last_used_fid);
+
ptlrpc_request_set_replen(req);
rc = ptlrpc_queue_wait(req);
#define DEBUG_SUBSYSTEM S_OST
#include <linux/module.h>
-#include <obd_cksum.h>
#include <obd_ost.h>
-#include <lustre_net.h>
#include <lustre_dlm.h>
-#include <lustre_export.h>
-#include <lustre_debug.h>
-#include <lustre_fid.h>
-#include <lustre_fld.h>
-#include <linux/init.h>
#include <lprocfs_status.h>
-#include <libcfs/list.h>
-#include <lustre_quota.h>
-#include <lustre_fid.h>
#include "ost_internal.h"
-#include <lustre_fid.h>
static int oss_num_threads;
CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
"CPU partitions OSS IO threads should run on");
-/*
- * this page is allocated statically when module is initializing
- * it is used to simulate data corruptions, see ost_checksum_bulk()
- * for details. as the original pages provided by the layers below
- * can be remain in the internal cache, we do not want to modify
- * them.
- */
-static struct page *ost_page_to_corrupt = NULL;
-
-/**
- * Do not return server-side uid/gid to remote client
- */
-static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
-{
- if (exp_connect_rmtclient(exp)) {
- oa->o_uid = -1;
- oa->o_gid = -1;
- oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
- }
-}
-
-/**
- * Validate oa from client.
- * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
- * req are valid.
- * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
- * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
- * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
- * whether this is oi_fid or real ostid. So it will check
- * OBD_CONNECT_FID, then convert the ostid to FID for old client.
- * c. Old FID-disable osc will send IDIF.
- * d. new FID-enable osc/osp will send normal FID.
- *
- * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
- * be used for LAST_ID file, and only being accessed inside OST now.
- */
-static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
- struct obd_ioobj *ioobj)
-{
- int rc = 0;
-
- if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
- fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
- /* Sigh 2.[123] client still sends echo req with oi_id = 0
- * during create, and we will reset this to 1, since this
- * oi_id is basically useless in the following create process,
- * but oi_id == 0 will make it difficult to tell whether it is
- * real FID or ost_id. */
- oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
- oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
- oa->o_oi.oi_fid.f_ver = 0;
- } else {
- if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
- GOTO(out, rc = -EPROTO);
-
- /* Note: this check might be forced in 2.5 or 2.6, i.e.
- * all of the requests are required to setup FLGROUP */
- if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
- ostid_set_seq_mdt0(&oa->o_oi);
- if (ioobj)
- ostid_set_seq_mdt0(&ioobj->ioo_oid);
- oa->o_valid |= OBD_MD_FLGROUP;
- }
-
- if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
- fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
- GOTO(out, rc = -EPROTO);
- }
-
- if (ioobj != NULL) {
- unsigned max_brw = ioobj_max_brw_get(ioobj);
-
- if (unlikely((max_brw & (max_brw - 1)) != 0)) {
- CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
- ": rc = -EPROTO\n", exp->exp_obd->obd_name,
- obd_export_nid2str(exp), max_brw,
- POSTID(&oa->o_oi));
- GOTO(out, rc = -EPROTO);
- }
- ioobj->ioo_oid = oa->o_oi;
- }
-
-out:
- if (rc != 0)
- CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
- exp->exp_obd->obd_name, obd_export_nid2str(exp),
- oa ? ostid_seq(&oa->o_oi) : -1,
- oa ? ostid_id(&oa->o_oi) : -1, rc);
- return rc;
-}
-
-void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
-{
- struct oti_req_ack_lock *ack_lock;
- int i;
-
- if (oti == NULL)
- return;
-
- if (req->rq_repmsg) {
- __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
- lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
- versions[0] = oti->oti_pre_version;
- lustre_msg_set_versions(req->rq_repmsg, versions);
- }
- req->rq_transno = oti->oti_transno;
-
- /* XXX 4 == entries in oti_ack_locks??? */
- for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
- if (!ack_lock->mode)
- break;
- /* XXX not even calling target_send_reply in some cases... */
- ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
- }
-}
-
-static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- /* Get the request body */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- if (ostid_id(&body->oa.o_oi) == 0)
- RETURN(-EPROTO);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- /* If there's a DLM request, cancel the locks mentioned in it*/
- if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
- struct ldlm_request *dlm;
-
- dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
- if (dlm == NULL)
- RETURN (-EFAULT);
- ldlm_request_cancel(req, dlm, 0);
- }
-
- /* If there's a capability, get it */
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST DESTROY");
- RETURN (-EFAULT);
- }
- }
-
- /* Prepare the reply */
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- /* Get the log cancellation cookie */
- if (body->oa.o_valid & OBD_MD_FLCOOKIE)
- oti->oti_logcookies = &body->oa.o_lcookie;
-
- /* Finish the reply */
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
-
- /* Do the destroy and set the reply status accordingly */
- req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
- &repbody->oa, NULL, oti, NULL, capa);
- RETURN(0);
-}
-
-/**
- * Helper function for getting server side [start, start+count] DLM lock
- * if asked by client.
- */
-static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
- __u64 start, __u64 count, struct lustre_handle *lh,
- int mode, __u64 flags)
-{
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- __u64 end = start + count;
-
- ENTRY;
-
- LASSERT(!lustre_handle_is_used(lh));
- /* o_id and o_gr are used for localizing resource, if client miss to set
- * them, do not trigger ASSERTION. */
- if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
- (OBD_MD_FLID | OBD_MD_FLGROUP)))
- RETURN(-EPROTO);
-
- if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
- !(oa->o_flags & OBD_FL_SRVLOCK))
- RETURN(0);
-
- if (mode == LCK_MINMODE)
- RETURN(0);
-
- ostid_build_res_name(&oa->o_oi, &res_id);
- CDEBUG(D_INODE, "OST-side extent lock.\n");
-
- policy.l_extent.start = start & CFS_PAGE_MASK;
-
- /* If ->o_blocks is EOF it means "lock till the end of the
- * file". Otherwise, it's size of a hole being punched (in bytes) */
- if (count == OBD_OBJECT_EOF || end < start)
- policy.l_extent.end = OBD_OBJECT_EOF;
- else
- policy.l_extent.end = end | ~CFS_PAGE_MASK;
-
- RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
- LDLM_EXTENT, &policy, mode, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
- NULL, lh));
-}
-
-/* Helper function: release lock, if any. */
-static void ost_lock_put(struct obd_export *exp,
- struct lustre_handle *lh, int mode)
-{
- ENTRY;
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, mode);
- EXIT;
-}
-
-static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_handle lh = { 0 };
- struct lustre_capa *capa = NULL;
- ldlm_mode_t lock_mode;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST GETATTR");
- RETURN(-EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- lock_mode = LCK_MINMODE;
- if (body->oa.o_valid & OBD_MD_FLFLAGS &&
- body->oa.o_flags & OBD_FL_SRVLOCK) {
- lock_mode = LCK_PR;
- if (body->oa.o_flags & OBD_FL_FLUSH)
- lock_mode = LCK_PW;
- }
- rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh,
- lock_mode, 0);
- if (rc)
- RETURN(rc);
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- GOTO(unlock, rc = -ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
-
- req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
-
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
-
- if (!(repbody->oa.o_valid & OBD_MD_FLFLAGS)) {
- repbody->oa.o_valid |= OBD_MD_FLFLAGS;
- repbody->oa.o_flags = 0;
- }
- repbody->oa.o_flags |= OBD_FL_FLUSH;
-
-unlock:
- ost_lock_put(exp, &lh, lock_mode);
- RETURN(rc);
-}
-
-static int ost_statfs(struct ptlrpc_request *req)
-{
- struct obd_statfs *osfs;
- int rc;
- ENTRY;
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
-
- req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
- osfs,
- cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
- 0);
- if (req->rq_status != 0)
- CERROR("ost: statfs failed: rc %d\n", req->rq_status);
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_EINPROGRESS))
- req->rq_status = -EINPROGRESS;
-
- RETURN(0);
-}
-
-static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
- oti->oti_logcookies = &body->oa.o_lcookie;
-
- req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
- &repbody->oa, NULL, oti);
- //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
- RETURN(0);
-}
-
-static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- __u64 flags = 0;
- struct lustre_handle lh = {0,};
- int rc;
- ENTRY;
-
- /* check that we do support OBD_CONNECT_TRUNCLOCK. */
- CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
- (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
- RETURN(-EPROTO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- /* standard truncate optimization: if file body is completely
- * destroyed, don't send data back to the server. */
- if (body->oa.o_size == 0)
- flags |= LDLM_FL_AST_DISCARD_DATA;
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
- repbody->oa.o_blocks, &lh, LCK_PW, flags);
- if (rc == 0) {
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
-
- if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
- repbody->oa.o_flags == OBD_FL_SRVLOCK)
- /*
- * If OBD_FL_SRVLOCK is the only bit set in
- * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
- * through filter_setattr() to filter_iocontrol().
- */
- repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
-
- if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill,
- &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST PUNCH");
- GOTO(unlock, rc = -EFAULT);
- }
- }
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- GOTO(unlock, rc = -ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
- oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
- oinfo->oi_capa = capa;
- oinfo->oi_flags = OBD_FL_PUNCH;
-
- req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
- oinfo, oti, NULL);
- OBD_FREE_PTR(oinfo);
-unlock:
- ost_lock_put(exp, &lh, LCK_PW);
- }
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(rc);
-}
-
-static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST SYNC");
- RETURN (-EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- RETURN(-ENOMEM);
-
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
- oinfo->oi_jobid = oti->oti_jobid;
- req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
- repbody->oa.o_size, repbody->oa.o_blocks,
- NULL);
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(0);
-}
-
-static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
- struct obd_trans_info *oti)
-{
- struct ost_body *body, *repbody;
- struct obd_info *oinfo;
- struct lustre_capa *capa = NULL;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- RETURN(-EFAULT);
-
- rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
- if (rc)
- RETURN(rc);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST SETATTR");
- RETURN (-EFAULT);
- }
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- repbody->oa = body->oa;
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- RETURN(-ENOMEM);
- oinfo->oi_oa = &repbody->oa;
- oinfo->oi_capa = capa;
-
- req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
- oti);
-
- OBD_FREE_PTR(oinfo);
-
- ost_drop_id(exp, &repbody->oa);
- RETURN(0);
-}
-
-static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
- cksum_type_t cksum_type)
-{
- struct cfs_crypto_hash_desc *hdesc;
- unsigned int bufsize;
- int i, err;
- unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
- __u32 cksum;
-
- hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
- if (IS_ERR(hdesc)) {
- CERROR("Unable to initialize checksum hash %s\n",
- cfs_crypto_hash_name(cfs_alg));
- return PTR_ERR(hdesc);
- }
- CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
- for (i = 0; i < desc->bd_iov_count; i++) {
-
- /* corrupt the data before we compute the checksum, to
- * simulate a client->OST data error */
- if (i == 0 && opc == OST_WRITE &&
- OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
- int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
- struct page *np = ost_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
- if (np) {
- char *ptr2 = kmap(np) + off;
-
- memcpy(ptr2, ptr, len);
- memcpy(ptr2, "bad3", min(4, len));
- kunmap(np);
- desc->bd_iov[i].kiov_page = np;
- } else {
- CERROR("can't alloc page for corruption\n");
- }
- }
- cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
- desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
- desc->bd_iov[i].kiov_len);
-
- /* corrupt the data after we compute the checksum, to
- * simulate an OST->client data error */
- if (i == 0 && opc == OST_READ &&
- OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
- int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
- int len = desc->bd_iov[i].kiov_len;
- struct page *np = ost_page_to_corrupt;
- char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
-
- if (np) {
- char *ptr2 = kmap(np) + off;
-
- memcpy(ptr2, ptr, len);
- memcpy(ptr2, "bad4", min(4, len));
- kunmap(np);
- desc->bd_iov[i].kiov_page = np;
- } else {
- CERROR("can't alloc page for corruption\n");
- }
- }
- }
-
- bufsize = 4;
- err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
- if (err)
- cfs_crypto_hash_final(hdesc, NULL, NULL);
-
- return cksum;
-}
-
-static int ost_brw_lock_get(int mode, struct obd_export *exp,
- struct obd_ioobj *obj, struct niobuf_remote *nb,
- struct lustre_handle *lh)
-{
- __u64 flags = 0;
- int nrbufs = obj->ioo_bufcnt;
- struct ldlm_res_id res_id;
- ldlm_policy_data_t policy;
- int i;
- ENTRY;
-
- ostid_build_res_name(&obj->ioo_oid, &res_id);
- LASSERT(mode == LCK_PR || mode == LCK_PW);
- LASSERT(!lustre_handle_is_used(lh));
-
- if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
- RETURN(0);
-
- for (i = 1; i < nrbufs; i ++)
- if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
- (nb[i].flags & OBD_BRW_SRVLOCK))
- RETURN(-EFAULT);
-
- policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
- policy.l_extent.end = (nb[nrbufs - 1].offset +
- nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
-
- RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
- LDLM_EXTENT, &policy, mode, &flags,
- ldlm_blocking_ast, ldlm_completion_ast,
- ldlm_glimpse_ast, NULL, 0, LVB_T_NONE,
- NULL, lh));
-}
-
-static void ost_brw_lock_put(int mode,
- struct obd_ioobj *obj, struct niobuf_remote *niob,
- struct lustre_handle *lh)
-{
- ENTRY;
- LASSERT(mode == LCK_PR || mode == LCK_PW);
- LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
- lustre_handle_is_used(lh));
- if (lustre_handle_is_used(lh))
- ldlm_lock_decref(lh, mode);
- EXIT;
-}
-
-/* Allocate thread local buffers if needed */
-static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
-{
- struct ost_thread_local_cache *tls =
- (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
- /* In normal mode of operation an I/O request is serviced only
- * by ll_ost_io threads each of them has own tls buffers allocated by
- * ost_io_thread_init().
- * During recovery, an I/O request may be queued until any of the ost
- * service threads process it. Not necessary it should be one of
- * ll_ost_io threads. In that case we dynamically allocating tls
- * buffers for the request service time. */
- if (unlikely(tls == NULL)) {
- LASSERT(r->rq_export->exp_in_recovery);
- OBD_ALLOC_PTR(tls);
- if (tls != NULL) {
- tls->temporary = 1;
- r->rq_svc_thread->t_data = tls;
- }
- }
- return tls;
-}
-
-/* Free thread local buffers if they were allocated only for servicing
- * this one request */
-static void ost_tls_put(struct ptlrpc_request *r)
-{
- struct ost_thread_local_cache *tls =
- (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-
- if (unlikely(tls->temporary)) {
- OBD_FREE_PTR(tls);
- r->rq_svc_thread->t_data = NULL;
- }
-}
-
-static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
- struct ptlrpc_bulk_desc *desc = NULL;
- struct obd_export *exp = req->rq_export;
- struct niobuf_remote *remote_nb;
- struct niobuf_local *local_nb;
- struct obd_ioobj *ioo;
- struct ost_body *body, *repbody;
- struct lustre_capa *capa = NULL;
- struct l_wait_info lwi;
- struct lustre_handle lockh = { 0 };
- int niocount, npages, nob = 0, rc, i;
- int no_reply = 0;
- struct ost_thread_local_cache *tls;
- ENTRY;
-
- req->rq_bulk_read = 1;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
- GOTO(out, rc = -EIO);
-
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
- /* Check if there is eviction in progress, and if so, wait for it to
- * finish */
- if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
- lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
- rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
- !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
- &lwi);
- }
- if (exp->exp_failed)
- GOTO(out, rc = -ENOTCONN);
-
- /* ost_body, ioobj & noibuf_remote are verified and swabbed in
- * ost_rw_hpreq_check(). */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
-
- /*
- * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
- * would be useful here and wherever we get &RMF_OBD_IOOBJ and
- * &RMF_NIOBUF_REMOTE.
- */
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- GOTO(out, rc = -EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- niocount = ioo->ioo_bufcnt;
- remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (remote_nb == NULL)
- GOTO(out, rc = -EFAULT);
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST BRW READ");
- GOTO(out, rc = -EFAULT);
- }
- }
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- GOTO(out, rc);
-
- tls = ost_tls_get(req);
- if (tls == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
- local_nb = tls->local;
-
- rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
- if (rc != 0)
- GOTO(out_tls, rc);
-
- /*
- * If getting the lock took more time than
- * client was willing to wait, drop it. b=11330
- */
- if (cfs_time_current_sec() > req->rq_deadline ||
- OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
- no_reply = 1;
- CERROR("Dropping timed-out read from %s because locking"
- "object "DOSTID" took %ld seconds (limit was %ld).\n",
- libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
- cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
- req->rq_deadline - req->rq_arrival_time.tv_sec);
- GOTO(out_lock, rc = -ETIMEDOUT);
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
- npages = OST_THREAD_POOL_SIZE;
- rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
- &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
- oti, capa);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_PUT_SOURCE, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(out_commitrw, rc = -ENOMEM);
-
- nob = 0;
- for (i = 0; i < npages; i++) {
- int page_rc = local_nb[i].rc;
-
- if (page_rc < 0) { /* error */
- rc = page_rc;
- break;
- }
-
- nob += page_rc;
- if (page_rc != 0) { /* some data! */
- LASSERT (local_nb[i].page != NULL);
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- page_rc);
- }
-
- if (page_rc != local_nb[i].len) { /* short read */
- /* All subsequent pages should be 0 */
- while(++i < npages)
- LASSERT(local_nb[i].rc == 0);
- break;
- }
- }
-
- if (body->oa.o_valid & OBD_MD_FLCKSUM) {
- cksum_type_t cksum_type =
- cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
- repbody->oa.o_flags : 0);
- repbody->oa.o_flags = cksum_type_pack(cksum_type);
- repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
- CDEBUG(D_PAGE, "checksum at read origin: %x\n",
- repbody->oa.o_cksum);
- } else {
- repbody->oa.o_valid = 0;
- }
- /* We're finishing using body->oa as an input variable */
-
- /* Check if client was evicted while we were doing i/o before touching
- network */
- if (rc == 0) {
- if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
- rc = target_bulk_io(exp, desc, &lwi);
- no_reply = rc != 0;
- }
-
-out_commitrw:
- /* Must commit after prep above in all cases */
- rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
- &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
- oti, rc);
-
- if (rc == 0)
- ost_drop_id(exp, &repbody->oa);
-
-out_lock:
- ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
-out_tls:
- ost_tls_put(req);
-out_bulk:
- if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
- ptlrpc_free_bulk_nopin(desc);
-out:
- LASSERT(rc <= 0);
- if (rc == 0) {
- req->rq_status = nob;
- ptlrpc_lprocfs_brw(req, nob);
- target_committed_to_req(req);
- ptlrpc_reply(req);
- } else if (!no_reply) {
- /* Only reply if there was no comms problem with bulk */
- target_committed_to_req(req);
- req->rq_status = rc;
- ptlrpc_error(req);
- } else {
- /* reply out callback would free */
- ptlrpc_req_drop_rs(req);
- LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
- "client will retry: rc %d\n",
- exp->exp_obd->obd_name,
- obd_uuid2str(&exp->exp_client_uuid),
- obd_export_nid2str(exp), rc);
- }
- /* send a bulk after reply to simulate a network delay or reordering
- * by a router */
- if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
- wait_queue_head_t waitq;
- struct l_wait_info lwi1;
-
- CDEBUG(D_INFO, "reorder BULK\n");
- init_waitqueue_head(&waitq);
-
- lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
- l_wait_event(waitq, 0, &lwi1);
- rc = target_bulk_io(exp, desc, &lwi);
- ptlrpc_free_bulk_nopin(desc);
- }
-
- RETURN(rc);
-}
-
-static void ost_warn_on_cksum(struct ptlrpc_request *req,
- struct ptlrpc_bulk_desc *desc,
- struct niobuf_local *local_nb, int npages,
- obd_count client_cksum, obd_count server_cksum,
- int mmap)
-{
- struct obd_export *exp = req->rq_export;
- struct ost_body *body;
- char *router;
- char *via;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- LASSERT (body != NULL);
-
- if (req->rq_peer.nid == desc->bd_sender) {
- via = router = "";
- } else {
- via = " via ";
- router = libcfs_nid2str(desc->bd_sender);
- }
-
- if (mmap) {
- CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
- client_cksum, server_cksum);
- return;
- }
-
- LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
- DFID" object "DOSTID" extent ["LPU64"-"LPU64
- "]: client csum %x, server csum %x\n",
- exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
- via, router,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_seq : (__u64)0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_oid : 0,
- body->oa.o_valid & OBD_MD_FLFID ?
- body->oa.o_parent_ver : 0,
- POSTID(&body->oa.o_oi),
- local_nb[0].lnb_file_offset,
- local_nb[npages-1].lnb_file_offset +
- local_nb[npages-1].len - 1,
- client_cksum, server_cksum);
-}
-
-static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
-{
- struct ptlrpc_bulk_desc *desc = NULL;
- struct obd_export *exp = req->rq_export;
- struct niobuf_remote *remote_nb;
- struct niobuf_local *local_nb;
- struct obd_ioobj *ioo;
- struct ost_body *body, *repbody;
- struct l_wait_info lwi;
- struct lustre_handle lockh = {0};
- struct lustre_capa *capa = NULL;
- __u32 *rcs;
- int objcount, niocount, npages;
- int rc, i, j;
- obd_count client_cksum = 0, server_cksum = 0;
- cksum_type_t cksum_type = OBD_CKSUM_CRC32;
- int no_reply = 0, mmap = 0;
- __u32 o_uid = 0, o_gid = 0;
- struct ost_thread_local_cache *tls;
- ENTRY;
-
- req->rq_bulk_write = 1;
-
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
- GOTO(out, rc = -EIO);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
- GOTO(out, rc = -EFAULT);
-
- /* pause before transaction has been started */
- OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
-
- /* ost_body, ioobj & noibuf_remote are verified and swabbed in
- * ost_rw_hpreq_check(). */
- body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
- if (body == NULL)
- GOTO(out, rc = -EFAULT);
-
- objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
- RCL_CLIENT) / sizeof(*ioo);
- ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
- if (ioo == NULL)
- GOTO(out, rc = -EFAULT);
-
- rc = ost_validate_obdo(exp, &body->oa, ioo);
- if (rc)
- RETURN(rc);
-
- for (niocount = i = 0; i < objcount; i++)
- niocount += ioo[i].ioo_bufcnt;
-
- /*
- * It'd be nice to have a capsule function to indicate how many elements
- * there were in a buffer for an RMF that's declared to be an array.
- * It's easy enough to compute the number of elements here though.
- */
- remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
- if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
- &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
- GOTO(out, rc = -EFAULT);
-
- if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
- (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
- memory_pressure_set();
-
- if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
- capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
- if (capa == NULL) {
- CERROR("Missing capability for OST BRW WRITE");
- GOTO(out, rc = -EFAULT);
- }
- }
-
- req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
- niocount * sizeof(*rcs));
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc != 0)
- GOTO(out, rc);
- CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
- rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
-
- tls = ost_tls_get(req);
- if (tls == NULL)
- GOTO(out_bulk, rc = -ENOMEM);
- local_nb = tls->local;
-
- rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
- if (rc != 0)
- GOTO(out_tls, rc);
-
- /*
- * If getting the lock took more time than
- * client was willing to wait, drop it. b=11330
- */
- if (cfs_time_current_sec() > req->rq_deadline ||
- OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
- no_reply = 1;
- CERROR("Dropping timed-out write from %s because locking "
- "object "DOSTID" took %ld seconds (limit was %ld).\n",
- libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
- cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
- req->rq_deadline - req->rq_arrival_time.tv_sec);
- GOTO(out_lock, rc = -ETIMEDOUT);
- }
-
- /* obd_preprw clobbers oa->valid, so save what we need */
- if (body->oa.o_valid & OBD_MD_FLCKSUM) {
- client_cksum = body->oa.o_cksum;
- if (body->oa.o_valid & OBD_MD_FLFLAGS)
- cksum_type = cksum_type_unpack(body->oa.o_flags);
- }
- if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
- mmap = 1;
-
- /* Because we already sync grant info with client when reconnect,
- * grant info will be cleared for resent req, then fed_grant and
- * total_grant will not be modified in following preprw_write */
- if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
- DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
- body->oa.o_valid &= ~OBD_MD_FLGRANT;
- }
-
- if (exp_connect_rmtclient(exp)) {
- o_uid = body->oa.o_uid;
- o_gid = body->oa.o_gid;
- }
-
- repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
- memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
-
- npages = OST_THREAD_POOL_SIZE;
- rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
- &repbody->oa, objcount, ioo, remote_nb, &npages,
- local_nb, oti, capa);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
- BULK_GET_SINK, OST_BULK_PORTAL);
- if (desc == NULL)
- GOTO(skip_transfer, rc = -ENOMEM);
-
- /* NB Having prepped, we must commit... */
- for (i = 0; i < npages; i++)
- ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
- local_nb[i].lnb_page_offset,
- local_nb[i].len);
-
- rc = sptlrpc_svc_prep_bulk(req, desc);
- if (rc != 0)
- GOTO(out_lock, rc);
-
- rc = target_bulk_io(exp, desc, &lwi);
- no_reply = rc != 0;
-
-skip_transfer:
- if (client_cksum != 0 && rc == 0) {
- static int cksum_counter;
- repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
- repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
- repbody->oa.o_flags |= cksum_type_pack(cksum_type);
- server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
- repbody->oa.o_cksum = server_cksum;
- cksum_counter++;
- if (unlikely(client_cksum != server_cksum)) {
- ost_warn_on_cksum(req, desc, local_nb, npages,
- client_cksum, server_cksum, mmap);
- cksum_counter = 0;
-
- } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
- CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
- cksum_counter, libcfs_id2str(req->rq_peer),
- server_cksum);
- }
- }
-
- /* Must commit after prep above in all cases */
- rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
- &repbody->oa, objcount, ioo, remote_nb, npages,
- local_nb, oti, rc);
- if (rc == -ENOTCONN)
- /* quota acquire process has been given up because
- * either the client has been evicted or the client
- * has timed out the request already */
- no_reply = 1;
-
- if (exp_connect_rmtclient(exp)) {
- repbody->oa.o_uid = o_uid;
- repbody->oa.o_gid = o_gid;
- }
-
- /*
- * Disable sending mtime back to the client. If the client locked the
- * whole object, then it has already updated the mtime on its side,
- * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
- */
- repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
-
- if (rc == 0) {
- int nob = 0;
-
- /* set per-requested niobuf return codes */
- for (i = j = 0; i < niocount; i++) {
- int len = remote_nb[i].len;
-
- nob += len;
- rcs[i] = 0;
- do {
- LASSERT(j < npages);
- if (local_nb[j].rc < 0)
- rcs[i] = local_nb[j].rc;
- len -= local_nb[j].len;
- j++;
- } while (len > 0);
- LASSERT(len == 0);
- }
- LASSERT(j == npages);
- ptlrpc_lprocfs_brw(req, nob);
- }
-
-out_lock:
- ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
-out_tls:
- ost_tls_put(req);
-out_bulk:
- if (desc)
- ptlrpc_free_bulk_nopin(desc);
-out:
- if (rc == 0) {
- oti_to_request(oti, req);
- target_committed_to_req(req);
- rc = ptlrpc_reply(req);
- } else if (!no_reply) {
- /* Only reply if there was no comms problem with bulk */
- target_committed_to_req(req);
- req->rq_status = rc;
- ptlrpc_error(req);
- } else {
- /* reply out callback would free */
- ptlrpc_req_drop_rs(req);
- LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
- "client will retry: rc %d\n",
- exp->exp_obd->obd_name,
- obd_uuid2str(&exp->exp_client_uuid),
- obd_export_nid2str(exp), rc);
- }
- memory_pressure_clr();
- RETURN(rc);
-}
-
-/**
- * Implementation of OST_SET_INFO.
- *
- * OST_SET_INFO is like ioctl(): heavily overloaded. Specifically, it takes a
- * "key" and a value RPC buffers as arguments, with the value's contents
- * interpreted according to the key.
- *
- * Value types that need swabbing have swabbing done explicitly, either here or
- * in functions called from here. This should be corrected: all swabbing should
- * be done in the capsule abstraction, as that will then allow us to move
- * swabbing exclusively to the client without having to modify server code
- * outside the capsule abstraction's implementation itself. To correct this
- * will require minor changes to the capsule abstraction; see the comments for
- * req_capsule_extend() in layout.c.
- */
-static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
-{
- struct ost_body *body = NULL, *repbody;
- char *key, *val = NULL;
- int keylen, vallen, rc = 0;
- int is_grant_shrink = 0;
- ENTRY;
-
- key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
- if (key == NULL) {
- DEBUG_REQ(D_HA, req, "no set_info key");
- RETURN(-EFAULT);
- }
- keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
- RCL_CLIENT);
-
- vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
- RCL_CLIENT);
-
- if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
- /* In this case the value is actually an RMF_OST_BODY, so we
- * transmutate the type of this PTLRPC */
- req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
-
- if (vallen) {
- if (is_grant_shrink) {
- body = req_capsule_client_get(&req->rq_pill,
- &RMF_OST_BODY);
- if (!body)
- RETURN(-EFAULT);
-
- repbody = req_capsule_server_get(&req->rq_pill,
- &RMF_OST_BODY);
- memcpy(repbody, body, sizeof(*body));
- val = (char*)repbody;
- } else {
- val = req_capsule_client_get(&req->rq_pill,
- &RMF_SETINFO_VAL);
- }
- }
-
- if (KEY_IS(KEY_EVICT_BY_NID)) {
- if (val && vallen)
- obd_export_evict_by_nid(exp->exp_obd, val);
- GOTO(out, rc = 0);
- } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
- if (vallen < sizeof(__u32))
- RETURN(-EFAULT);
- __swab32s((__u32 *)val);
- }
-
- /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
- * a struct ost_body * value */
- rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
- key, vallen, val, NULL);
-out:
- lustre_msg_set_status(req->rq_repmsg, 0);
- RETURN(rc);
-}
-
-struct locked_region {
- cfs_list_t list;
- struct lustre_handle lh;
-};
-
-static int lock_region(struct obd_export *exp, struct obdo *oa,
- unsigned long long begin, unsigned long long end,
- cfs_list_t *locked)
-{
- struct locked_region *region = NULL;
- int rc;
-
- LASSERT(begin <= end);
- OBD_ALLOC_PTR(region);
- if (region == NULL)
- return -ENOMEM;
-
- rc = ost_lock_get(exp, oa, begin, end - begin, ®ion->lh, LCK_PR, 0);
- if (rc) {
- OBD_FREE_PTR(region);
- return rc;
- }
-
- CDEBUG(D_OTHER, "ost lock [%llu,%llu], lh=%p\n",
- begin, end, ®ion->lh);
- cfs_list_add(®ion->list, locked);
-
- return 0;
-}
-
-static int lock_zero_regions(struct obd_export *exp, struct obdo *oa,
- struct ll_user_fiemap *fiemap,
- cfs_list_t *locked)
+/**
+ * Validate oa from client.
+ * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
+ * req are valid.
+ * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
+ * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
+ * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
+ * whether this is oi_fid or real ostid. So it will check
+ * OBD_CONNECT_FID, then convert the ostid to FID for old client.
+ * c. Old FID-disable osc will send IDIF.
+ * d. new FID-enable osc/osp will send normal FID.
+ *
+ * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
+ * be used for LAST_ID file, and only being accessed inside OST now.
+ */
+static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
+ struct obd_ioobj *ioobj)
{
- __u64 begin = fiemap->fm_start;
- unsigned int i;
int rc = 0;
- struct ll_fiemap_extent *fiemap_start = fiemap->fm_extents;
- ENTRY;
-
- CDEBUG(D_OTHER, "extents count %u\n", fiemap->fm_mapped_extents);
- for (i = 0; i < fiemap->fm_mapped_extents; i++) {
- if (fiemap_start[i].fe_logical > begin) {
- CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
- begin, fiemap_start[i].fe_logical);
- rc = lock_region(exp, oa, begin,
- fiemap_start[i].fe_logical, locked);
- if (rc)
- RETURN(rc);
- }
-
- begin = fiemap_start[i].fe_logical + fiemap_start[i].fe_length;
- }
-
- if (begin < (fiemap->fm_start + fiemap->fm_length)) {
- CDEBUG(D_OTHER, "ost lock [%llu,%llu]\n",
- begin, fiemap->fm_start + fiemap->fm_length);
- rc = lock_region(exp, oa, begin,
- fiemap->fm_start + fiemap->fm_length, locked);
- }
-
- RETURN(rc);
-}
-
-static void unlock_zero_regions(struct obd_export *exp, cfs_list_t *locked)
-{
- struct locked_region *entry, *temp;
- cfs_list_for_each_entry_safe(entry, temp, locked, list) {
- CDEBUG(D_OTHER, "ost unlock lh=%p\n", &entry->lh);
- ost_lock_put(exp, &entry->lh, LCK_PR);
- cfs_list_del(&entry->list);
- OBD_FREE_PTR(entry);
- }
-}
-
-static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
-{
- void *key, *reply;
- int keylen, replylen, rc = 0;
- struct req_capsule *pill = &req->rq_pill;
- cfs_list_t locked = CFS_LIST_HEAD_INIT(locked);
- struct ll_fiemap_info_key *fm_key = NULL;
- struct ll_user_fiemap *fiemap;
- ENTRY;
-
- /* this common part for get_info rpc */
- key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
- if (key == NULL) {
- DEBUG_REQ(D_HA, req, "no get_info key");
- RETURN(-EFAULT);
- }
- keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
- if (KEY_IS(KEY_FIEMAP)) {
- fm_key = key;
- rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
- if (rc)
- RETURN(rc);
- }
-
- rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
- &replylen, NULL, NULL);
- if (rc)
- RETURN(rc);
-
- req_capsule_set_size(pill, &RMF_GENERIC_DATA,
- RCL_SERVER, replylen);
+ if (unlikely(!(exp_connect_flags(exp) & OBD_CONNECT_FID) &&
+ fid_seq_is_echo(oa->o_oi.oi.oi_seq) && oa != NULL)) {
+ /* Sigh 2.[123] client still sends echo req with oi_id = 0
+ * during create, and we will reset this to 1, since this
+ * oi_id is basically useless in the following create process,
+ * but oi_id == 0 will make it difficult to tell whether it is
+ * real FID or ost_id. */
+ oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
+ oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
+ oa->o_oi.oi_fid.f_ver = 0;
+ } else {
+ if (unlikely((oa == NULL) || ostid_id(&oa->o_oi) == 0))
+ GOTO(out, rc = -EPROTO);
- rc = req_capsule_server_pack(pill);
- if (rc)
- RETURN(rc);
-
- reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
- if (reply == NULL)
- RETURN(-ENOMEM);
-
- if (KEY_IS(KEY_LAST_FID)) {
- void *val;
- int vallen;
-
- req_capsule_extend(pill, &RQF_OST_GET_INFO_LAST_FID);
- val = req_capsule_client_get(pill, &RMF_SETINFO_VAL);
- vallen = req_capsule_get_size(pill, &RMF_SETINFO_VAL,
- RCL_CLIENT);
- if (val != NULL && vallen > 0 && replylen >= vallen) {
- memcpy(reply, val, vallen);
- } else {
- CERROR("%s: invalid req val %p vallen %d replylen %d\n",
- exp->exp_obd->obd_name, val, vallen, replylen);
- RETURN(-EINVAL);
+ /* Note: this check might be forced in 2.5 or 2.6, i.e.
+ * all of the requests are required to setup FLGROUP */
+ if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
+ ostid_set_seq_mdt0(&oa->o_oi);
+ if (ioobj)
+ ostid_set_seq_mdt0(&ioobj->ioo_oid);
+ oa->o_valid |= OBD_MD_FLGROUP;
}
- }
- /* call again to fill in the reply buffer */
- rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
- &replylen, reply, NULL);
-
- /* LU-3219: Lock the sparse areas to make sure dirty flushed back
- * from client, then call fiemap again. */
- if (KEY_IS(KEY_FIEMAP) && (fm_key->oa.o_valid & OBD_MD_FLFLAGS) &&
- (fm_key->oa.o_flags & OBD_FL_SRVLOCK)) {
- fiemap = (struct ll_user_fiemap *)reply;
- fm_key = key;
-
- rc = lock_zero_regions(exp, &fm_key->oa, fiemap, &locked);
- if (rc == 0 && !cfs_list_empty(&locked))
- rc = obd_get_info(req->rq_svc_thread->t_env, exp,
- keylen, key, &replylen, reply, NULL);
- unlock_zero_regions(exp, &locked);
- if (rc)
- RETURN(rc);
+ if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+ GOTO(out, rc = -EPROTO);
}
- lustre_msg_set_status(req->rq_repmsg, 0);
-
- RETURN(rc);
-}
-
-static int ost_handle_quotactl(struct ptlrpc_request *req)
-{
- struct obd_quotactl *oqctl, *repoqc;
- int rc;
- ENTRY;
-
- oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL)
- GOTO(out, rc = -EPROTO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- GOTO(out, rc);
-
- repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- req->rq_status = obd_quotactl(req->rq_export, oqctl);
- *repoqc = *oqctl;
-
-out:
- RETURN(rc);
-}
-
-static int ost_handle_quotacheck(struct ptlrpc_request *req)
-{
- struct obd_quotactl *oqctl;
- int rc;
- ENTRY;
-
- oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
- if (oqctl == NULL)
- RETURN(-EPROTO);
-
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(-ENOMEM);
-
- /* deprecated, not used any more */
- req->rq_status = -EOPNOTSUPP;
- RETURN(-EOPNOTSUPP);
-}
-
-static int ost_llog_handle_connect(struct obd_export *exp,
- struct ptlrpc_request *req)
-{
- struct llogd_conn_body *body;
- int rc;
- ENTRY;
-
- body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
- rc = obd_llog_connect(exp, body);
- RETURN(rc);
-}
-
-#define ost_init_sec_none(reply) \
-do { \
- reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT | \
- OBD_CONNECT_RMT_CLIENT_FORCE | \
- OBD_CONNECT_OSS_CAPA); \
-} while (0)
-
-static int ost_init_sec_level(struct ptlrpc_request *req)
-{
- struct obd_export *exp = req->rq_export;
- struct req_capsule *pill = &req->rq_pill;
- struct obd_device *obd = exp->exp_obd;
- struct filter_obd *filter = &obd->u.filter;
- char *client = libcfs_nid2str(req->rq_peer.nid);
- struct obd_connect_data *data, *reply;
- int rc = 0, remote;
- ENTRY;
-
- data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
- reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
- if (data == NULL || reply == NULL)
- RETURN(-EFAULT);
-
- /* connection from MDT is always trusted */
- if (req->rq_auth_usr_mdt) {
- ost_init_sec_none(reply);
- RETURN(0);
- }
-
- /* no GSS support case */
- if (!req->rq_auth_gss) {
- if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
- CWARN("client %s -> target %s does not user GSS, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- } else {
- ost_init_sec_none(reply);
- RETURN(0);
- }
- }
-
- /* old version case */
- if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
- !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
- if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
- CWARN("client %s -> target %s uses old version, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- } else {
- CWARN("client %s -> target %s uses old version, "
- "run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- ost_init_sec_none(reply);
- RETURN(0);
- }
- }
-
- remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
- if (remote) {
- if (!req->rq_auth_remote)
- CDEBUG(D_SEC, "client (local realm) %s -> target %s "
- "asked to be remote.\n", client, obd->obd_name);
- } else if (req->rq_auth_remote) {
- remote = 1;
- CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
- "as remote by default.\n", client, obd->obd_name);
- }
-
- if (remote) {
- if (!filter->fo_fl_oss_capa) {
- CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
- " but OSS capabilities are not enabled: %d.\n",
- client, obd->obd_name, filter->fo_fl_oss_capa);
- RETURN(-EACCES);
- }
- }
-
- switch (filter->fo_sec_level) {
- case LUSTRE_SEC_NONE:
- if (!remote) {
- ost_init_sec_none(reply);
- break;
- } else {
- CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
- "can not run under security level %d.\n",
- client, obd->obd_name, filter->fo_sec_level);
- RETURN(-EACCES);
- }
- case LUSTRE_SEC_REMOTE:
- if (!remote)
- ost_init_sec_none(reply);
- break;
- case LUSTRE_SEC_ALL:
- if (!remote) {
- reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
- OBD_CONNECT_RMT_CLIENT_FORCE);
- if (!filter->fo_fl_oss_capa)
- reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
- }
- break;
- default:
- RETURN(-EINVAL);
- }
-
- RETURN(rc);
-}
-
-/*
- * FIXME
- * this should be done in filter_connect()/filter_reconnect(), but
- * we can't obtain information like NID, which stored in incoming
- * request, thus can't decide what flavor to use. so we do it here.
- *
- * This hack should be removed after the OST stack be rewritten, just
- * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
- */
-static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
-{
- struct obd_export *exp = req->rq_export;
- struct filter_obd *filter = &exp->exp_obd->u.filter;
- struct sptlrpc_flavor flvr;
- int rc = 0;
-
- if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
- LUSTRE_ECHO_NAME) == 0)) {
- exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
- return 0;
- }
-
- if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
- read_lock(&filter->fo_sptlrpc_lock);
- sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
- req->rq_sp_from,
- req->rq_peer.nid,
- &flvr);
- read_unlock(&filter->fo_sptlrpc_lock);
-
- spin_lock(&exp->exp_lock);
-
- exp->exp_sp_peer = req->rq_sp_from;
- exp->exp_flvr = flvr;
-
- if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
- exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
- CERROR("unauthorized rpc flavor %x from %s, "
- "expect %x\n", req->rq_flvr.sf_rpc,
- libcfs_nid2str(req->rq_peer.nid),
- exp->exp_flvr.sf_rpc);
- rc = -EACCES;
- }
-
- spin_unlock(&exp->exp_lock);
- } else {
- if (exp->exp_sp_peer != req->rq_sp_from) {
- CERROR("RPC source %s doesn't match %s\n",
- sptlrpc_part2name(req->rq_sp_from),
- sptlrpc_part2name(exp->exp_sp_peer));
- rc = -EACCES;
- } else {
- rc = sptlrpc_target_export_check(exp, req);
- }
- }
-
- return rc;
-}
-
-/* Ensure that data and metadata are synced to the disk when lock is cancelled
- * (if requested) */
-int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
- void *data, int flag)
-{
- struct lu_env env;
- __u32 sync_lock_cancel = 0;
- __u32 len = sizeof(sync_lock_cancel);
- int rc = 0;
-
- ENTRY;
+ if (ioobj != NULL) {
+ unsigned max_brw = ioobj_max_brw_get(ioobj);
- rc = lu_env_init(&env, LCT_DT_THREAD);
- if (unlikely(rc != 0))
- RETURN(rc);
-
- rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
- KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
- if (rc == 0 && flag == LDLM_CB_CANCELING &&
- (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
- (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
- (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
- lock->l_flags & LDLM_FL_CBPENDING))) {
- struct obd_info *oinfo;
- struct obdo *oa;
- int rc;
-
- OBD_ALLOC_PTR(oinfo);
- if (!oinfo)
- GOTO(out_env, rc = -ENOMEM);
- OBDO_ALLOC(oa);
- if (!oa) {
- OBD_FREE_PTR(oinfo);
- GOTO(out_env, rc = -ENOMEM);
+ if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+ CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+ ": rc = -EPROTO\n", exp->exp_obd->obd_name,
+ obd_export_nid2str(exp), max_brw,
+ POSTID(&oa->o_oi));
+ GOTO(out, rc = -EPROTO);
}
-
- ostid_res_name_to_id(&oa->o_oi, &lock->l_resource->lr_name);
- oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
- oinfo->oi_oa = oa;
- oinfo->oi_capa = BYPASS_CAPA;
-
- rc = obd_sync(&env, lock->l_export, oinfo,
- lock->l_policy_data.l_extent.start,
- lock->l_policy_data.l_extent.end, NULL);
- if (rc)
- CERROR("Error %d syncing data on lock cancel\n", rc);
-
- OBDO_FREE(oa);
- OBD_FREE_PTR(oinfo);
+ ioobj->ioo_oid = oa->o_oi;
}
- rc = ldlm_server_blocking_ast(lock, desc, data, flag);
-out_env:
- lu_env_fini(&env);
- RETURN(rc);
-}
-
-static int ost_filter_recovery_request(struct ptlrpc_request *req,
- struct obd_device *obd, int *process)
-{
- switch (lustre_msg_get_opc(req->rq_reqmsg)) {
- case OST_CONNECT: /* This will never get here, but for completeness. */
- case OST_DISCONNECT:
- *process = 1;
- RETURN(0);
-
- case OBD_PING:
- case OST_CREATE:
- case OST_DESTROY:
- case OST_PUNCH:
- case OST_SETATTR:
- case OST_SYNC:
- case OST_WRITE:
- case OBD_LOG_CANCEL:
- case LDLM_ENQUEUE:
- *process = target_queue_recovery_request(req, obd);
- RETURN(0);
-
- default:
- DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
- *process = -EAGAIN;
- RETURN(0);
- }
-}
-
-int ost_msg_check_version(struct lustre_msg *msg)
-{
- int rc;
-
- switch(lustre_msg_get_opc(msg)) {
- case OST_CONNECT:
- case OST_DISCONNECT:
- case OBD_PING:
- case SEC_CTX_INIT:
- case SEC_CTX_INIT_CONT:
- case SEC_CTX_FINI:
- rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_OBD_VERSION);
- break;
- case OST_CREATE:
- case OST_DESTROY:
- case OST_GETATTR:
- case OST_SETATTR:
- case OST_WRITE:
- case OST_READ:
- case OST_PUNCH:
- case OST_STATFS:
- case OST_SYNC:
- case OST_SET_INFO:
- case OST_GET_INFO:
- case OST_QUOTACHECK:
- case OST_QUOTACTL:
- rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_OST_VERSION);
- break;
- case LDLM_ENQUEUE:
- case LDLM_CONVERT:
- case LDLM_CANCEL:
- case LDLM_BL_CALLBACK:
- case LDLM_CP_CALLBACK:
- rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_DLM_VERSION);
- break;
- case LLOG_ORIGIN_CONNECT:
- case OBD_LOG_CANCEL:
- rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
- if (rc)
- CERROR("bad opc %u version %08x, expecting %08x\n",
- lustre_msg_get_opc(msg),
- lustre_msg_get_version(msg),
- LUSTRE_LOG_VERSION);
- break;
- case OST_QUOTA_ADJUST_QUNIT:
- rc = -ENOTSUPP;
- CERROR("Quota adjust is deprecated as of 2.4.0\n");
- break;
- default:
- CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
- rc = -ENOTSUPP;
- }
- return rc;
+out:
+ if (rc != 0)
+ CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
+ exp->exp_obd->obd_name, obd_export_nid2str(exp),
+ oa ? ostid_seq(&oa->o_oi) : -1,
+ oa ? ostid_id(&oa->o_oi) : -1, rc);
+ return rc;
}
struct ost_prolong_data {
RETURN(0);
}
-/* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
-int ost_handle(struct ptlrpc_request *req)
-{
- struct obd_trans_info trans_info = { 0, };
- struct obd_trans_info *oti = &trans_info;
- int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
- struct obd_device *obd = NULL;
- __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
- ENTRY;
-
- /* OST module is kept between remounts, but the last reference
- * to specific module (say, osd or ofd) kills all related keys
- * from the environment. so we have to refill it until the root
- * cause is fixed properly */
- lu_env_refill(req->rq_svc_thread->t_env);
-
- LASSERT(current->journal_info == NULL);
-
- /* primordial rpcs don't affect server recovery */
- switch (opc) {
- case SEC_CTX_INIT:
- case SEC_CTX_INIT_CONT:
- case SEC_CTX_FINI:
- GOTO(out, rc = 0);
- }
-
- req_capsule_init(&req->rq_pill, req, RCL_SERVER);
-
- if (opc != OST_CONNECT) {
- if (!class_connected_export(req->rq_export)) {
- CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
- opc, libcfs_id2str(req->rq_peer));
- req->rq_status = -ENOTCONN;
- GOTO(out, rc = -ENOTCONN);
- }
-
- obd = req->rq_export->exp_obd;
-
- /* Check for aborted recovery. */
- if (obd->obd_recovering) {
- rc = ost_filter_recovery_request(req, obd,
- &should_process);
- if (rc || !should_process)
- RETURN(rc);
- else if (should_process < 0) {
- req->rq_status = should_process;
- rc = ptlrpc_error(req);
- RETURN(rc);
- }
- }
- }
-
- oti_init(oti, req);
-
- rc = ost_msg_check_version(req->rq_reqmsg);
- if (rc)
- RETURN(rc);
-
- if (req && req->rq_reqmsg && req->rq_export &&
- (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS))
- oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
-
- switch (opc) {
- case OST_CONNECT: {
- CDEBUG(D_INODE, "connect\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
- RETURN(0);
- rc = target_handle_connect(req);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
- RETURN(0);
- if (!rc) {
- rc = ost_init_sec_level(req);
- if (!rc)
- rc = ost_connect_check_sptlrpc(req);
- }
- if (rc == 0) {
- struct obd_export *exp = req->rq_export;
- struct obd_connect_data *reply;
- /* Now that connection handling has completed
- * successfully, atomically update the connect flags
- * in the shared export data structure.*/
- reply = req_capsule_server_get(&req->rq_pill,
- &RMF_CONNECT_DATA);
- spin_lock(&exp->exp_lock);
- exp->exp_connect_data = *reply;
- spin_unlock(&exp->exp_lock);
- }
- break;
- }
- case OST_DISCONNECT:
- CDEBUG(D_INODE, "disconnect\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
- RETURN(0);
- rc = target_handle_disconnect(req);
- break;
- case OST_CREATE:
- CDEBUG(D_INODE, "create\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_create(req->rq_export, req, oti);
- break;
- case OST_DESTROY:
- CDEBUG(D_INODE, "destroy\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_destroy(req->rq_export, req, oti);
- break;
- case OST_GETATTR:
- CDEBUG(D_INODE, "getattr\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
- RETURN(0);
- rc = ost_getattr(req->rq_export, req);
- break;
- case OST_SETATTR:
- CDEBUG(D_INODE, "setattr\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
- RETURN(0);
- rc = ost_setattr(req->rq_export, req, oti);
- break;
- case OST_WRITE:
- req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
- CDEBUG(D_INODE, "write\n");
- /* req->rq_request_portal would be nice, if it was set */
- if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
- CERROR("%s: deny write request from %s to portal %u\n",
- req->rq_export->exp_obd->obd_name,
- obd_export_nid2str(req->rq_export),
- ptlrpc_req2svc(req)->srv_req_portal);
- GOTO(out, rc = -EPROTO);
- }
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
- GOTO(out, rc = -ENOSPC);
- if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
- GOTO(out, rc = -EROFS);
- rc = ost_brw_write(req, oti);
- LASSERT(current->journal_info == NULL);
- /* ost_brw_write sends its own replies */
- RETURN(rc);
- case OST_READ:
- req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
- CDEBUG(D_INODE, "read\n");
- /* req->rq_request_portal would be nice, if it was set */
- if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
- CERROR("%s: deny read request from %s to portal %u\n",
- req->rq_export->exp_obd->obd_name,
- obd_export_nid2str(req->rq_export),
- ptlrpc_req2svc(req)->srv_req_portal);
- GOTO(out, rc = -EPROTO);
- }
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
- RETURN(0);
- rc = ost_brw_read(req, oti);
- LASSERT(current->journal_info == NULL);
- /* ost_brw_read sends its own replies */
- RETURN(rc);
- case OST_PUNCH:
- CDEBUG(D_INODE, "punch\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
- RETURN(0);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
- GOTO(out, rc = -EROFS);
- rc = ost_punch(req->rq_export, req, oti);
- break;
- case OST_STATFS:
- CDEBUG(D_INODE, "statfs\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
- RETURN(0);
- rc = ost_statfs(req);
- break;
- case OST_SYNC:
- CDEBUG(D_INODE, "sync\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
- RETURN(0);
- rc = ost_sync(req->rq_export, req, oti);
- break;
- case OST_SET_INFO:
- DEBUG_REQ(D_INODE, req, "set_info");
- req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
- rc = ost_set_info(req->rq_export, req);
- break;
- case OST_GET_INFO:
- DEBUG_REQ(D_INODE, req, "get_info");
- req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
- rc = ost_get_info(req->rq_export, req);
- break;
- case OST_QUOTACHECK:
- CDEBUG(D_INODE, "quotacheck\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
- RETURN(0);
- rc = ost_handle_quotacheck(req);
- break;
- case OST_QUOTACTL:
- CDEBUG(D_INODE, "quotactl\n");
- req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
- if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
- RETURN(0);
- rc = ost_handle_quotactl(req);
- break;
- case OBD_PING:
- DEBUG_REQ(D_INODE, req, "ping");
- req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
- rc = target_handle_ping(req);
- break;
- /* FIXME - just reply status */
- case LLOG_ORIGIN_CONNECT:
- DEBUG_REQ(D_INODE, req, "log connect");
- req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
- rc = ost_llog_handle_connect(req->rq_export, req);
- req->rq_status = rc;
- rc = req_capsule_server_pack(&req->rq_pill);
- if (rc)
- RETURN(rc);
- RETURN(ptlrpc_reply(req));
- case LDLM_ENQUEUE:
- CDEBUG(D_INODE, "enqueue\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE_NET))
- RETURN(0);
- rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
- ost_blocking_ast,
- ldlm_server_glimpse_ast);
- fail = OBD_FAIL_OST_LDLM_REPLY_NET;
- break;
- case LDLM_CONVERT:
- CDEBUG(D_INODE, "convert\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT_NET))
- RETURN(0);
- rc = ldlm_handle_convert(req);
- break;
- case LDLM_CANCEL:
- CDEBUG(D_INODE, "cancel\n");
- req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
- if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL_NET))
- RETURN(0);
- rc = ldlm_handle_cancel(req);
- break;
- case LDLM_BL_CALLBACK:
- case LDLM_CP_CALLBACK:
- CDEBUG(D_INODE, "callback\n");
- CERROR("callbacks should not happen on OST\n");
- /* fall through */
- default:
- CERROR("Unexpected opcode %d\n", opc);
- req->rq_status = -ENOTSUPP;
- rc = ptlrpc_error(req);
- RETURN(rc);
- }
-
- LASSERT(current->journal_info == NULL);
-
- EXIT;
- /* If we're DISCONNECTing, the export_data is already freed */
- if (!rc && opc != OST_DISCONNECT)
- target_committed_to_req(req);
-
-out:
- if (!rc)
- oti_to_request(oti, req);
-
- target_send_reply(req, rc, fail);
- return 0;
-}
-EXPORT_SYMBOL(ost_handle);
-
-/*
- * free per-thread pool created by ost_io_thread_init().
- */
-static void ost_io_thread_done(struct ptlrpc_thread *thread)
-{
- struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
- * Storage */
-
- ENTRY;
-
- LASSERT(thread != NULL);
-
- /*
- * be prepared to handle partially-initialized pools (because this is
- * called from ost_io_thread_init() for cleanup.
- */
- tls = thread->t_data;
- if (tls != NULL) {
- OBD_FREE_PTR(tls);
- thread->t_data = NULL;
- }
- EXIT;
-}
-
-/*
- * initialize per-thread page pool (bug 5137).
- */
-static int ost_io_thread_init(struct ptlrpc_thread *thread)
-{
- struct ost_thread_local_cache *tls;
-
- ENTRY;
-
- LASSERT(thread != NULL);
- LASSERT(thread->t_data == NULL);
-
- OBD_ALLOC_PTR(tls);
- if (tls == NULL)
- RETURN(-ENOMEM);
- thread->t_data = tls;
- RETURN(0);
-}
-
#define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
static struct cfs_cpt_table *ost_io_cptable;
.cc_pattern = oss_cpts,
},
.psc_ops = {
- .so_req_handler = ost_handle,
+ .so_req_handler = tgt_request_handle,
.so_req_printer = target_print_req,
.so_hpreq_handler = ptlrpc_hpreq_handler,
},
.cc_pattern = oss_cpts,
},
.psc_ops = {
- .so_req_handler = ost_handle,
+ .so_req_handler = tgt_request_handle,
.so_req_printer = target_print_req,
},
};
oss_io_cpts : NULL,
},
.psc_ops = {
- .so_thr_init = ost_io_thread_init,
- .so_thr_done = ost_io_thread_done,
- .so_req_handler = ost_handle,
+ .so_thr_init = tgt_io_thread_init,
+ .so_thr_done = tgt_io_thread_done,
+ .so_req_handler = tgt_request_handle,
.so_hpreq_handler = ost_io_hpreq_handler,
.so_req_printer = target_print_req,
},
GOTO(out_io, rc);
}
-#if 0
/* Object update service */
memset(&svc_conf, 0, sizeof(svc_conf));
svc_conf = (typeof(svc_conf)) {
ost->ost_out_service = NULL;
GOTO(out_seq, rc);
}
-#endif
+
ping_evictor_start();
RETURN(0);
+out_seq:
+ ptlrpc_unregister_service(ost->ost_seq_service);
+ ost->ost_seq_service = NULL;
out_io:
ptlrpc_unregister_service(ost->ost_io_service);
ost->ost_io_service = NULL;
ptlrpc_unregister_service(ost->ost_create_service);
ptlrpc_unregister_service(ost->ost_io_service);
ptlrpc_unregister_service(ost->ost_seq_service);
-#if 0
ptlrpc_unregister_service(ost->ost_out_service);
-#endif
+
ost->ost_service = NULL;
ost->ost_create_service = NULL;
ost->ost_io_service = NULL;
return rc;
}
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
-{
- return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
-}
-
/* use obd ops to offer management infrastructure */
static struct obd_ops ost_obd_ops = {
.o_owner = THIS_MODULE,
static int __init ost_init(void)
{
- struct lprocfs_static_vars lvars;
- int rc;
- ENTRY;
+ struct lprocfs_static_vars lvars;
+ int rc;
- ost_page_to_corrupt = alloc_page(GFP_IOFS);
+ ENTRY;
lprocfs_ost_init_vars(&lvars);
rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
static void /*__exit*/ ost_exit(void)
{
- if (ost_page_to_corrupt)
- page_cache_release(ost_page_to_corrupt);
-
- class_unregister_type(LUSTRE_OSS_NAME);
+ class_unregister_type(LUSTRE_OSS_NAME);
}
MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
#define OSS_SERVICE_WATCHDOG_FACTOR 2
-/*
- * tunables for per-thread page pool (bug 5137)
- */
-#define OST_THREAD_POOL_SIZE PTLRPC_MAX_BRW_PAGES /* pool size in pages */
-#define OST_THREAD_POOL_GFP GFP_HIGHUSER /* GFP mask for pool pages */
-
-struct page;
-struct niobuf_local;
-struct niobuf_remote;
-struct ptlrpc_request;
-
-/*
- * struct ost_thread_local_cache is allocated and initialized for each OST
- * thread by ost_thread_init().
- */
-struct ost_thread_local_cache {
- /*
- * pool of nio buffers used by write-path
- */
- struct niobuf_local local[OST_THREAD_POOL_SIZE];
- unsigned int temporary:1;
-};
-
-struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r);
-
#ifdef LPROCFS
void lprocfs_ost_init_vars(struct lprocfs_static_vars *lvars);
#else
static const struct req_msg_field *ost_get_info_generic_client[] = {
&RMF_PTLRPC_BODY,
- &RMF_SETINFO_KEY
+ &RMF_GETINFO_KEY
};
static const struct req_msg_field *ost_get_last_id_server[] = {
&RMF_OBD_ID
};
+static const struct req_msg_field *ost_get_last_fid_client[] = {
+ &RMF_PTLRPC_BODY,
+ &RMF_GETINFO_KEY,
+ &RMF_FID,
+};
+
static const struct req_msg_field *ost_get_last_fid_server[] = {
&RMF_PTLRPC_BODY,
&RMF_FID,
&RQF_OST_BRW_WRITE,
&RQF_OST_STATFS,
&RQF_OST_SET_GRANT_INFO,
- &RQF_OST_GET_INFO_GENERIC,
+ &RQF_OST_GET_INFO,
&RQF_OST_GET_INFO_LAST_ID,
&RQF_OST_GET_INFO_LAST_FID,
&RQF_OST_SET_INFO_LAST_FID,
ost_body_only);
EXPORT_SYMBOL(RQF_OST_SET_GRANT_INFO);
-struct req_format RQF_OST_GET_INFO_GENERIC =
+struct req_format RQF_OST_GET_INFO =
DEFINE_REQ_FMT0("OST_GET_INFO", ost_get_info_generic_client,
ost_get_info_generic_server);
-EXPORT_SYMBOL(RQF_OST_GET_INFO_GENERIC);
+EXPORT_SYMBOL(RQF_OST_GET_INFO);
struct req_format RQF_OST_GET_INFO_LAST_ID =
DEFINE_REQ_FMT0("OST_GET_INFO_LAST_ID", ost_get_info_generic_client,
EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_ID);
struct req_format RQF_OST_GET_INFO_LAST_FID =
- DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", obd_set_info_client,
+ DEFINE_REQ_FMT0("OST_GET_INFO_LAST_FID", ost_get_last_fid_client,
ost_get_last_fid_server);
EXPORT_SYMBOL(RQF_OST_GET_INFO_LAST_FID);
#include <obd.h>
#include <obd_class.h>
+#include <obd_cksum.h>
#include "tgt_internal.h"
RETURN(rc);
}
+/**
+ * Validate oa from client.
+ * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
+ * req are valid.
+ * a. objects in Single MDT FS seq = FID_SEQ_OST_MDT0, oi_id != 0
+ * b. Echo objects(seq = 2), old echo client still use oi_id/oi_seq to
+ * pack ost_id. Because non-zero oi_seq will make it diffcult to tell
+ * whether this is oi_fid or real ostid. So it will check
+ * OBD_CONNECT_FID, then convert the ostid to FID for old client.
+ * c. Old FID-disable osc will send IDIF.
+ * d. new FID-enable osc/osp will send normal FID.
+ *
+ * And also oi_id/f_oid should always start from 1. oi_id/f_oid = 0 will
+ * be used for LAST_ID file, and only being accessed inside OST now.
+ */
+int tgt_validate_obdo(struct tgt_session_info *tsi, struct obdo *oa)
+{
+ int rc;
+
+ ENTRY;
+
+ if (unlikely(!(exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_FID) &&
+ fid_seq_is_echo(oa->o_oi.oi.oi_seq))) {
+ /* Sigh 2.[123] client still sends echo req with oi_id = 0
+ * during create, and we will reset this to 1, since this
+ * oi_id is basically useless in the following create process,
+ * but oi_id == 0 will make it difficult to tell whether it is
+ * real FID or ost_id. */
+ oa->o_oi.oi_fid.f_oid = oa->o_oi.oi.oi_id ?: 1;
+ oa->o_oi.oi_fid.f_seq = FID_SEQ_ECHO;
+ oa->o_oi.oi_fid.f_ver = 0;
+ } else {
+ if (unlikely((oa->o_valid & OBD_MD_FLID &&
+ ostid_id(&oa->o_oi) == 0)))
+ GOTO(out, rc = -EPROTO);
+
+ /* Note: this check might be forced in 2.5 or 2.6, i.e.
+ * all of the requests are required to setup FLGROUP */
+ if (unlikely(!(oa->o_valid & OBD_MD_FLGROUP))) {
+ ostid_set_seq_mdt0(&oa->o_oi);
+ oa->o_valid |= OBD_MD_FLGROUP;
+ }
+
+ if (unlikely(!(fid_seq_is_idif(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_mdt0(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_norm(ostid_seq(&oa->o_oi)) ||
+ fid_seq_is_echo(ostid_seq(&oa->o_oi)))))
+ GOTO(out, rc = -EPROTO);
+ }
+ RETURN(0);
+out:
+ CERROR("%s: client %s sent bad object "DOSTID": rc = %d\n",
+ tgt_name(tsi->tsi_tgt), obd_export_nid2str(tsi->tsi_exp),
+ ostid_seq(&oa->o_oi), ostid_id(&oa->o_oi), rc);
+ return rc;
+}
+EXPORT_SYMBOL(tgt_validate_obdo);
+
+static int tgt_ost_body_unpack(struct tgt_session_info *tsi, __u32 flags)
+{
+ struct ost_body *body;
+ struct req_capsule *pill = tsi->tsi_pill;
+ struct lustre_capa *capa;
+ struct obd_ioobj *ioo;
+ int rc;
+
+ ENTRY;
+
+ body = req_capsule_client_get(pill, &RMF_OST_BODY);
+ if (body == NULL)
+ RETURN(-EFAULT);
+
+ rc = tgt_validate_obdo(tsi, &body->oa);
+ if (rc)
+ RETURN(rc);
+
+ if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
+ capa = req_capsule_client_get(tsi->tsi_pill, &RMF_CAPA1);
+ if (capa == NULL) {
+ CERROR("%s: OSSCAPA flag is set without capability\n",
+ tgt_name(tsi->tsi_tgt));
+ RETURN(-EFAULT);
+ }
+ }
+
+ tsi->tsi_ost_body = body;
+
+ if (req_capsule_has_field(pill, &RMF_OBD_IOOBJ, RCL_CLIENT)) {
+ unsigned max_brw;
+ struct niobuf_remote *rnb;
+
+ ioo = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
+ if (ioo == NULL)
+ RETURN(-EPROTO);
+
+ rnb = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
+ if (rnb == NULL)
+ RETURN(-EPROTO);
+
+ max_brw = ioobj_max_brw_get(ioo);
+ if (unlikely((max_brw & (max_brw - 1)) != 0)) {
+ CERROR("%s: client %s sent bad ioobj max %u for "DOSTID
+ ": rc = %d\n", tgt_name(tsi->tsi_tgt),
+ obd_export_nid2str(tsi->tsi_exp), max_brw,
+ POSTID(&body->oa.o_oi), -EPROTO);
+ RETURN(-EPROTO);
+ }
+ ioo->ioo_oid = body->oa.o_oi;
+ }
+
+ if (!(body->oa.o_valid & OBD_MD_FLID)) {
+ if (flags & HABEO_CORPUS) {
+ CERROR("%s: OBD_MD_FLID flag is not set in ost_body "
+ "but OID/FID is mandatory with HABEO_CORPUS\n",
+ tgt_name(tsi->tsi_tgt));
+ RETURN(-EPROTO);
+ } else {
+ RETURN(0);
+ }
+ }
+
+ rc = ostid_to_fid(&tsi->tsi_fid, &body->oa.o_oi, 0);
+ if (rc != 0)
+ RETURN(rc);
+
+ if (!fid_is_sane(&tsi->tsi_fid)) {
+ CERROR("%s: invalid FID: "DFID"\n", tgt_name(tsi->tsi_tgt),
+ PFID(&tsi->tsi_fid));
+ RETURN(-EINVAL);
+ }
+
+ ost_fid_build_resid(&tsi->tsi_fid, &tsi->tsi_resid);
+
+ /*
+ * OST doesn't get object in advance for further use to prevent
+ * situations with nested object_find which is potential deadlock.
+ */
+ tsi->tsi_corpus = NULL;
+ RETURN(rc);
+}
+
static int tgt_unpack_req_pack_rep(struct tgt_session_info *tsi, __u32 flags)
{
struct req_capsule *pill = tsi->tsi_pill;
if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT)) {
rc = tgt_mdt_body_unpack(tsi, flags);
+ } else if (req_capsule_has_field(pill, &RMF_OST_BODY, RCL_CLIENT)) {
+ rc = tgt_ost_body_unpack(tsi, flags);
} else {
rc = 0;
}
- if (flags & HABEO_REFERO) {
+ if (rc == 0 && flags & HABEO_REFERO) {
/* Pack reply */
if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
LASSERT(current->journal_info == NULL);
- /*
- * If we're DISCONNECTing, the export_data is already freed
- *
- * WAS if (likely(... && h->mh_opc != MDS_DISCONNECT))
- */
if (likely(rc == 0 && req->rq_export))
target_committed_to_req(req);
case SEQ_QUERY:
case FLD_QUERY:
case LDLM_ENQUEUE:
+ case OST_CREATE:
+ case OST_DESTROY:
+ case OST_PUNCH:
+ case OST_SETATTR:
+ case OST_SYNC:
+ case OST_WRITE:
*process = target_queue_recovery_request(req, obd);
RETURN(0);
rc = ptlrpc_error(req);
GOTO(out, rc);
}
+ /* recovery-small test 18c asks to drop connect reply */
+ if (unlikely(opc == OST_CONNECT &&
+ OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2)))
+ GOTO(out, rc = 0);
}
if (unlikely(!class_connected_export(req->rq_export))) {
tsi->tsi_tgt = tgt = class_exp2tgt(req->rq_export);
tsi->tsi_exp = req->rq_export;
+ if (exp_connect_flags(req->rq_export) & OBD_CONNECT_JOBSTATS)
+ tsi->tsi_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
+ else
+ tsi->tsi_jobid = NULL;
request_fail_id = tgt->lut_request_fail_id;
tsi->tsi_reply_fail_id = tgt->lut_reply_fail_id;
tsi->tsi_env = NULL;
tsi->tsi_mdt_body = NULL;
tsi->tsi_dlm_req = NULL;
+ fid_zero(&tsi->tsi_fid);
+ memset(&tsi->tsi_resid, 0, sizeof tsi->tsi_resid);
return rc;
}
EXPORT_SYMBOL(tgt_request_handle);
return rc;
}
+int tgt_adapt_sptlrpc_conf(struct lu_target *tgt, int initial)
+{
+ struct sptlrpc_rule_set tmp_rset;
+ int rc;
+
+ sptlrpc_rule_set_init(&tmp_rset);
+ rc = sptlrpc_conf_target_get_rules(tgt->lut_obd, &tmp_rset, initial);
+ if (rc) {
+ CERROR("%s: failed get sptlrpc rules: rc = %d\n",
+ tgt_name(tgt), rc);
+ return rc;
+ }
+
+ sptlrpc_target_update_exp_flavor(tgt->lut_obd, &tmp_rset);
+
+ write_lock(&tgt->lut_sptlrpc_lock);
+ sptlrpc_rule_set_free(&tgt->lut_sptlrpc_rset);
+ tgt->lut_sptlrpc_rset = tmp_rset;
+ write_unlock(&tgt->lut_sptlrpc_lock);
+
+ return 0;
+}
+EXPORT_SYMBOL(tgt_adapt_sptlrpc_conf);
+
int tgt_connect(struct tgt_session_info *tsi)
{
struct ptlrpc_request *req = tgt_ses_req(tsi);
};
EXPORT_SYMBOL(tgt_obd_handlers);
+int tgt_sync(const struct lu_env *env, struct lu_target *tgt,
+ struct dt_object *obj)
+{
+ int rc = 0;
+
+ ENTRY;
+
+ /* if no objid is specified, it means "sync whole filesystem" */
+ if (obj == NULL) {
+ rc = dt_sync(env, tgt->lut_bottom);
+ } else if (dt_version_get(env, obj) >
+ tgt->lut_obd->obd_last_committed) {
+ rc = dt_object_sync(env, obj);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_sync);
/*
* Unified target DLM handlers.
*/
+
+/* Ensure that data and metadata are synced to the disk when lock is cancelled
+ * (if requested) */
+int tgt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct lu_env env;
+ struct lu_target *tgt;
+ struct dt_object *obj;
+ struct lu_fid fid;
+ int rc = 0;
+
+ ENTRY;
+
+ tgt = class_exp2tgt(lock->l_export);
+
+ if (flag == LDLM_CB_CANCELING &&
+ (lock->l_granted_mode & (LCK_PW | LCK_GROUP)) &&
+ (tgt->lut_sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
+ (tgt->lut_sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
+ lock->l_flags & LDLM_FL_CBPENDING))) {
+ rc = lu_env_init(&env, LCT_DT_THREAD);
+ if (unlikely(rc != 0))
+ RETURN(rc);
+
+ ost_fid_from_resid(&fid, &lock->l_resource->lr_name);
+ obj = dt_locate(&env, tgt->lut_bottom, &fid);
+ if (IS_ERR(obj))
+ GOTO(err_env, rc = PTR_ERR(obj));
+
+ if (!dt_object_exists(obj))
+ GOTO(err_put, rc = -ENOENT);
+
+ rc = tgt_sync(&env, tgt, obj);
+ if (rc < 0) {
+ CERROR("%s: sync failed on lock cancel: rc = %d\n",
+ tgt_name(tgt), rc);
+ }
+err_put:
+ lu_object_put(&env, &obj->do_lu);
+err_env:
+ lu_env_fini(&env);
+ }
+
+ rc = ldlm_server_blocking_ast(lock, desc, data, flag);
+ RETURN(rc);
+}
+
struct ldlm_callback_suite tgt_dlm_cbs = {
.lcs_completion = ldlm_server_completion_ast,
- .lcs_blocking = ldlm_server_blocking_ast,
+ .lcs_blocking = tgt_blocking_ast,
.lcs_glimpse = ldlm_server_glimpse_ast
};
TGT_SEC_HDL_VAR(0, SEC_CTX_FINI, tgt_sec_ctx_handle),
};
EXPORT_SYMBOL(tgt_sec_ctx_handlers);
+
+/*
+ * initialize per-thread page pool (bug 5137).
+ */
+int tgt_io_thread_init(struct ptlrpc_thread *thread)
+{
+ struct tgt_thread_big_cache *tbc;
+
+ ENTRY;
+
+ LASSERT(thread != NULL);
+ LASSERT(thread->t_data == NULL);
+
+ OBD_ALLOC_LARGE(tbc, sizeof(*tbc));
+ if (tbc == NULL)
+ RETURN(-ENOMEM);
+ thread->t_data = tbc;
+ RETURN(0);
+}
+EXPORT_SYMBOL(tgt_io_thread_init);
+
+/*
+ * free per-thread pool created by tgt_thread_init().
+ */
+void tgt_io_thread_done(struct ptlrpc_thread *thread)
+{
+ struct tgt_thread_big_cache *tbc;
+
+ ENTRY;
+
+ LASSERT(thread != NULL);
+
+ /*
+ * be prepared to handle partially-initialized pools (because this is
+ * called from ost_io_thread_init() for cleanup.
+ */
+ tbc = thread->t_data;
+ if (tbc != NULL) {
+ OBD_FREE_LARGE(tbc, sizeof(*tbc));
+ thread->t_data = NULL;
+ }
+ EXIT;
+}
+EXPORT_SYMBOL(tgt_io_thread_done);
+/**
+ * Helper function for getting server side [start, start+count] DLM lock
+ * if asked by client.
+ */
+int tgt_extent_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ __u64 start, __u64 end, struct lustre_handle *lh,
+ int mode, __u64 *flags)
+{
+ ldlm_policy_data_t policy;
+ int rc;
+
+ ENTRY;
+
+ LASSERT(lh != NULL);
+ LASSERT(ns != NULL);
+ LASSERT(!lustre_handle_is_used(lh));
+
+ policy.l_extent.gid = 0;
+ policy.l_extent.start = start & CFS_PAGE_MASK;
+
+ /*
+ * If ->o_blocks is EOF it means "lock till the end of the file".
+ * Otherwise, it's size of an extent or hole being punched (in bytes).
+ */
+ if (end == OBD_OBJECT_EOF || end < start)
+ policy.l_extent.end = OBD_OBJECT_EOF;
+ else
+ policy.l_extent.end = end | ~CFS_PAGE_MASK;
+
+ rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_EXTENT, &policy, mode,
+ flags, ldlm_blocking_ast,
+ ldlm_completion_ast, ldlm_glimpse_ast,
+ NULL, 0, LVB_T_NONE, NULL, lh);
+ RETURN(rc == ELDLM_OK ? 0 : -EIO);
+}
+EXPORT_SYMBOL(tgt_extent_lock);
+
+void tgt_extent_unlock(struct lustre_handle *lh, ldlm_mode_t mode)
+{
+ LASSERT(lustre_handle_is_used(lh));
+ ldlm_lock_decref(lh, mode);
+}
+EXPORT_SYMBOL(tgt_extent_unlock);
+
+int tgt_brw_lock(struct ldlm_namespace *ns, struct ldlm_res_id *res_id,
+ struct obd_ioobj *obj, struct niobuf_remote *nb,
+ struct lustre_handle *lh, int mode)
+{
+ __u64 flags = 0;
+ int nrbufs = obj->ioo_bufcnt;
+ int i;
+
+ ENTRY;
+
+ LASSERT(mode == LCK_PR || mode == LCK_PW);
+ LASSERT(!lustre_handle_is_used(lh));
+
+ if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
+ RETURN(0);
+
+ for (i = 1; i < nrbufs; i++)
+ if (!(nb[i].flags & OBD_BRW_SRVLOCK))
+ RETURN(-EFAULT);
+
+ RETURN(tgt_extent_lock(ns, res_id, nb[0].offset,
+ nb[nrbufs - 1].offset + nb[nrbufs - 1].len - 1,
+ lh, mode, &flags));
+}
+EXPORT_SYMBOL(tgt_brw_lock);
+
+void tgt_brw_unlock(struct obd_ioobj *obj, struct niobuf_remote *niob,
+ struct lustre_handle *lh, int mode)
+{
+ ENTRY;
+
+ LASSERT(mode == LCK_PR || mode == LCK_PW);
+ LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
+ lustre_handle_is_used(lh));
+ if (lustre_handle_is_used(lh))
+ tgt_extent_unlock(lh, mode);
+ EXIT;
+}
+EXPORT_SYMBOL(tgt_brw_unlock);
+
+static __u32 tgt_checksum_bulk(struct lu_target *tgt,
+ struct ptlrpc_bulk_desc *desc, int opc,
+ cksum_type_t cksum_type)
+{
+ struct cfs_crypto_hash_desc *hdesc;
+ unsigned int bufsize;
+ int i, err;
+ unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
+ __u32 cksum;
+
+ hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
+ if (IS_ERR(hdesc)) {
+ CERROR("%s: unable to initialize checksum hash %s\n",
+ tgt_name(tgt), cfs_crypto_hash_name(cfs_alg));
+ return PTR_ERR(hdesc);
+ }
+
+ CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
+ for (i = 0; i < desc->bd_iov_count; i++) {
+ /* corrupt the data before we compute the checksum, to
+ * simulate a client->OST data error */
+ if (i == 0 && opc == OST_WRITE &&
+ OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
+ int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+ int len = desc->bd_iov[i].kiov_len;
+ struct page *np = tgt_page_to_corrupt;
+ char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+
+ if (np) {
+ char *ptr2 = kmap(np) + off;
+
+ memcpy(ptr2, ptr, len);
+ memcpy(ptr2, "bad3", min(4, len));
+ kunmap(np);
+ desc->bd_iov[i].kiov_page = np;
+ } else {
+ CERROR("%s: can't alloc page for corruption\n",
+ tgt_name(tgt));
+ }
+ }
+ cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
+ desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
+ desc->bd_iov[i].kiov_len);
+
+ /* corrupt the data after we compute the checksum, to
+ * simulate an OST->client data error */
+ if (i == 0 && opc == OST_READ &&
+ OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
+ int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+ int len = desc->bd_iov[i].kiov_len;
+ struct page *np = tgt_page_to_corrupt;
+ char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
+
+ if (np) {
+ char *ptr2 = kmap(np) + off;
+
+ memcpy(ptr2, ptr, len);
+ memcpy(ptr2, "bad4", min(4, len));
+ kunmap(np);
+ desc->bd_iov[i].kiov_page = np;
+ } else {
+ CERROR("%s: can't alloc page for corruption\n",
+ tgt_name(tgt));
+ }
+ }
+ }
+
+ bufsize = 4;
+ err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
+ if (err)
+ cfs_crypto_hash_final(hdesc, NULL, NULL);
+
+ return cksum;
+}
+
+int tgt_brw_read(struct tgt_session_info *tsi)
+{
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct obd_export *exp = tsi->tsi_exp;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body, *repbody;
+ struct l_wait_info lwi;
+ struct lustre_handle lockh = { 0 };
+ int niocount, npages, nob = 0, rc, i;
+ int no_reply = 0;
+ struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
+
+ ENTRY;
+
+ if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+ CERROR("%s: deny read request from %s to portal %u\n",
+ tgt_name(tsi->tsi_tgt),
+ obd_export_nid2str(req->rq_export),
+ ptlrpc_req2svc(req)->srv_req_portal);
+ RETURN(-EPROTO);
+ }
+
+ req->rq_bulk_read = 1;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
+ RETURN(-EIO);
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+
+ /* Check if there is eviction in progress, and if so, wait for it to
+ * finish */
+ if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
+ /* We do not care how long it takes */
+ lwi = LWI_INTR(NULL, NULL);
+ rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
+ !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
+ &lwi);
+ }
+
+ /* There must be big cache in current thread to process this request
+ * if it is NULL then something went wrong and it wasn't allocated,
+ * report -ENOMEM in that case */
+ if (tbc == NULL)
+ RETURN(-ENOMEM);
+
+ body = tsi->tsi_ost_body;
+ LASSERT(body != NULL);
+
+ ioo = req_capsule_client_get(tsi->tsi_pill, &RMF_OBD_IOOBJ);
+ LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
+
+ niocount = ioo->ioo_bufcnt;
+ remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
+
+ local_nb = tbc->local;
+
+ rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
+ remote_nb, &lockh, LCK_PR);
+ if (rc != 0)
+ RETURN(rc);
+
+ /*
+ * If getting the lock took more time than
+ * client was willing to wait, drop it. b=11330
+ */
+ if (cfs_time_current_sec() > req->rq_deadline ||
+ OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
+ no_reply = 1;
+ CERROR("Dropping timed-out read from %s because locking"
+ "object "DOSTID" took %ld seconds (limit was %ld).\n",
+ libcfs_id2str(req->rq_peer), POSTID(&ioo->ioo_oid),
+ cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+ req->rq_deadline - req->rq_arrival_time.tv_sec);
+ GOTO(out_lock, rc = -ETIMEDOUT);
+ }
+
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ repbody->oa = body->oa;
+
+ npages = PTLRPC_MAX_BRW_PAGES;
+ rc = obd_preprw(tsi->tsi_env, OBD_BRW_READ, exp, &repbody->oa, 1,
+ ioo, remote_nb, &npages, local_nb, NULL, BYPASS_CAPA);
+ if (rc != 0)
+ GOTO(out_lock, rc);
+
+ desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+ BULK_PUT_SOURCE, OST_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(out_commitrw, rc = -ENOMEM);
+
+ nob = 0;
+ for (i = 0; i < npages; i++) {
+ int page_rc = local_nb[i].rc;
+
+ if (page_rc < 0) {
+ rc = page_rc;
+ break;
+ }
+
+ nob += page_rc;
+ if (page_rc != 0) { /* some data! */
+ LASSERT(local_nb[i].page != NULL);
+ ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+ local_nb[i].lnb_page_offset,
+ page_rc);
+ }
+
+ if (page_rc != local_nb[i].len) { /* short read */
+ /* All subsequent pages should be 0 */
+ while (++i < npages)
+ LASSERT(local_nb[i].rc == 0);
+ break;
+ }
+ }
+
+ if (body->oa.o_valid & OBD_MD_FLCKSUM) {
+ cksum_type_t cksum_type =
+ cksum_type_unpack(body->oa.o_valid & OBD_MD_FLFLAGS ?
+ body->oa.o_flags : 0);
+ repbody->oa.o_flags = cksum_type_pack(cksum_type);
+ repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+ repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
+ OST_READ, cksum_type);
+ CDEBUG(D_PAGE, "checksum at read origin: %x\n",
+ repbody->oa.o_cksum);
+ } else {
+ repbody->oa.o_valid = 0;
+ }
+ /* We're finishing using body->oa as an input variable */
+
+ /* Check if client was evicted while we were doing i/o before touching
+ * network */
+ if (likely(rc == 0 &&
+ !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+ rc = target_bulk_io(exp, desc, &lwi);
+ no_reply = rc != 0;
+ }
+
+out_commitrw:
+ /* Must commit after prep above in all cases */
+ rc = obd_commitrw(tsi->tsi_env, OBD_BRW_READ, exp,
+ &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
+ NULL, rc);
+ if (rc == 0)
+ tgt_drop_id(exp, &repbody->oa);
+out_lock:
+ tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PR);
+
+ if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
+ ptlrpc_free_bulk_nopin(desc);
+
+ LASSERT(rc <= 0);
+ if (rc == 0) {
+ rc = nob;
+ ptlrpc_lprocfs_brw(req, nob);
+ } else if (no_reply) {
+ req->rq_no_reply = 1;
+ /* reply out callback would free */
+ ptlrpc_req_drop_rs(req);
+ LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
+ "client will retry: rc %d\n",
+ exp->exp_obd->obd_name,
+ obd_uuid2str(&exp->exp_client_uuid),
+ obd_export_nid2str(exp), rc);
+ }
+ /* send a bulk after reply to simulate a network delay or reordering
+ * by a router */
+ if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
+ wait_queue_head_t waitq;
+ struct l_wait_info lwi1;
+
+ CDEBUG(D_INFO, "reorder BULK\n");
+ init_waitqueue_head(&waitq);
+
+ lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
+ l_wait_event(waitq, 0, &lwi1);
+ target_bulk_io(exp, desc, &lwi);
+ ptlrpc_free_bulk_nopin(desc);
+ }
+
+ RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_brw_read);
+
+static void tgt_warn_on_cksum(struct ptlrpc_request *req,
+ struct ptlrpc_bulk_desc *desc,
+ struct niobuf_local *local_nb, int npages,
+ obd_count client_cksum, obd_count server_cksum,
+ bool mmap)
+{
+ struct obd_export *exp = req->rq_export;
+ struct ost_body *body;
+ char *router;
+ char *via;
+
+ body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
+ LASSERT(body != NULL);
+
+ if (req->rq_peer.nid == desc->bd_sender) {
+ via = router = "";
+ } else {
+ via = " via ";
+ router = libcfs_nid2str(desc->bd_sender);
+ }
+
+ if (mmap) {
+ CDEBUG_LIMIT(D_INFO, "client csum %x, server csum %x\n",
+ client_cksum, server_cksum);
+ return;
+ }
+
+ LCONSOLE_ERROR_MSG(0x168, "BAD WRITE CHECKSUM: %s from %s%s%s inode "
+ DFID" object "DOSTID" extent ["LPU64"-"LPU64
+ "]: client csum %x, server csum %x\n",
+ exp->exp_obd->obd_name, libcfs_id2str(req->rq_peer),
+ via, router,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_seq : (__u64)0,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_oid : 0,
+ body->oa.o_valid & OBD_MD_FLFID ?
+ body->oa.o_parent_ver : 0,
+ POSTID(&body->oa.o_oi),
+ local_nb[0].lnb_file_offset,
+ local_nb[npages-1].lnb_file_offset +
+ local_nb[npages-1].len - 1,
+ client_cksum, server_cksum);
+}
+
+int tgt_brw_write(struct tgt_session_info *tsi)
+{
+ struct ptlrpc_request *req = tgt_ses_req(tsi);
+ struct ptlrpc_bulk_desc *desc = NULL;
+ struct obd_export *exp = req->rq_export;
+ struct niobuf_remote *remote_nb;
+ struct niobuf_local *local_nb;
+ struct obd_ioobj *ioo;
+ struct ost_body *body, *repbody;
+ struct l_wait_info lwi;
+ struct lustre_handle lockh = {0};
+ __u32 *rcs;
+ int objcount, niocount, npages;
+ int rc, i, j;
+ cksum_type_t cksum_type = OBD_CKSUM_CRC32;
+ bool no_reply = false, mmap;
+ struct tgt_thread_big_cache *tbc = req->rq_svc_thread->t_data;
+
+ ENTRY;
+
+ if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
+ CERROR("%s: deny write request from %s to portal %u\n",
+ tgt_name(tsi->tsi_tgt),
+ obd_export_nid2str(req->rq_export),
+ ptlrpc_req2svc(req)->srv_req_portal);
+ RETURN(err_serious(-EPROTO));
+ }
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
+ RETURN(err_serious(-ENOSPC));
+ if (OBD_FAIL_TIMEOUT(OBD_FAIL_OST_EROFS, 1))
+ RETURN(err_serious(-EROFS));
+
+ req->rq_bulk_write = 1;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
+ RETURN(err_serious(-EIO));
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
+ RETURN(err_serious(-EFAULT));
+
+ /* pause before transaction has been started */
+ OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
+
+ /* There must be big cache in current thread to process this request
+ * if it is NULL then something went wrong and it wasn't allocated,
+ * report -ENOMEM in that case */
+ if (tbc == NULL)
+ RETURN(-ENOMEM);
+
+ body = tsi->tsi_ost_body;
+ LASSERT(body != NULL);
+
+ ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
+ LASSERT(ioo != NULL); /* must exists after tgt_ost_body_unpack */
+
+ objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
+ RCL_CLIENT) / sizeof(*ioo);
+
+ for (niocount = i = 0; i < objcount; i++)
+ niocount += ioo[i].ioo_bufcnt;
+
+ remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
+ LASSERT(remote_nb != NULL); /* must exists after tgt_ost_body_unpack */
+ if (niocount != req_capsule_get_size(&req->rq_pill,
+ &RMF_NIOBUF_REMOTE, RCL_CLIENT) /
+ sizeof(*remote_nb))
+ RETURN(err_serious(-EPROTO));
+
+ if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
+ (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
+ memory_pressure_set();
+
+ req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
+ niocount * sizeof(*rcs));
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc != 0)
+ GOTO(out, rc = err_serious(rc));
+
+ CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
+ rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
+
+ local_nb = tbc->local;
+
+ rc = tgt_brw_lock(exp->exp_obd->obd_namespace, &tsi->tsi_resid, ioo,
+ remote_nb, &lockh, LCK_PW);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ /*
+ * If getting the lock took more time than
+ * client was willing to wait, drop it. b=11330
+ */
+ if (cfs_time_current_sec() > req->rq_deadline ||
+ OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
+ no_reply = true;
+ CERROR("%s: Dropping timed-out write from %s because locking "
+ "object "DOSTID" took %ld seconds (limit was %ld).\n",
+ tgt_name(tsi->tsi_tgt), libcfs_id2str(req->rq_peer),
+ POSTID(&ioo->ioo_oid),
+ cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
+ req->rq_deadline - req->rq_arrival_time.tv_sec);
+ GOTO(out_lock, rc = -ETIMEDOUT);
+ }
+
+ /* Because we already sync grant info with client when reconnect,
+ * grant info will be cleared for resent req, then fed_grant and
+ * total_grant will not be modified in following preprw_write */
+ if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
+ DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
+ body->oa.o_valid &= ~OBD_MD_FLGRANT;
+ }
+
+ repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
+ if (repbody == NULL)
+ GOTO(out_lock, rc = -ENOMEM);
+ repbody->oa = body->oa;
+
+ npages = PTLRPC_MAX_BRW_PAGES;
+ rc = obd_preprw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
+ objcount, ioo, remote_nb, &npages, local_nb, NULL,
+ BYPASS_CAPA);
+ if (rc < 0)
+ GOTO(out_lock, rc);
+
+ desc = ptlrpc_prep_bulk_exp(req, npages, ioobj_max_brw_get(ioo),
+ BULK_GET_SINK, OST_BULK_PORTAL);
+ if (desc == NULL)
+ GOTO(skip_transfer, rc = -ENOMEM);
+
+ /* NB Having prepped, we must commit... */
+ for (i = 0; i < npages; i++)
+ ptlrpc_prep_bulk_page_nopin(desc, local_nb[i].page,
+ local_nb[i].lnb_page_offset,
+ local_nb[i].len);
+
+ rc = sptlrpc_svc_prep_bulk(req, desc);
+ if (rc != 0)
+ GOTO(skip_transfer, rc);
+
+ rc = target_bulk_io(exp, desc, &lwi);
+ no_reply = rc != 0;
+
+skip_transfer:
+ if (body->oa.o_valid & OBD_MD_FLCKSUM && rc == 0) {
+ static int cksum_counter;
+
+ if (body->oa.o_valid & OBD_MD_FLFLAGS)
+ cksum_type = cksum_type_unpack(body->oa.o_flags);
+
+ repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
+ repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
+ repbody->oa.o_flags |= cksum_type_pack(cksum_type);
+ repbody->oa.o_cksum = tgt_checksum_bulk(tsi->tsi_tgt, desc,
+ OST_WRITE, cksum_type);
+ cksum_counter++;
+
+ if (unlikely(body->oa.o_cksum != repbody->oa.o_cksum)) {
+ mmap = (body->oa.o_valid & OBD_MD_FLFLAGS &&
+ body->oa.o_flags & OBD_FL_MMAP);
+
+ tgt_warn_on_cksum(req, desc, local_nb, npages,
+ body->oa.o_cksum,
+ repbody->oa.o_cksum, mmap);
+ cksum_counter = 0;
+ } else if ((cksum_counter & (-cksum_counter)) ==
+ cksum_counter) {
+ CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
+ cksum_counter, libcfs_id2str(req->rq_peer),
+ repbody->oa.o_cksum);
+ }
+ }
+
+ /* Must commit after prep above in all cases */
+ rc = obd_commitrw(tsi->tsi_env, OBD_BRW_WRITE, exp, &repbody->oa,
+ objcount, ioo, remote_nb, npages, local_nb, NULL,
+ rc);
+ if (rc == -ENOTCONN)
+ /* quota acquire process has been given up because
+ * either the client has been evicted or the client
+ * has timed out the request already */
+ no_reply = true;
+
+ /*
+ * Disable sending mtime back to the client. If the client locked the
+ * whole object, then it has already updated the mtime on its side,
+ * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
+ */
+ repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
+
+ if (rc == 0) {
+ int nob = 0;
+
+ /* set per-requested niobuf return codes */
+ for (i = j = 0; i < niocount; i++) {
+ int len = remote_nb[i].len;
+
+ nob += len;
+ rcs[i] = 0;
+ do {
+ LASSERT(j < npages);
+ if (local_nb[j].rc < 0)
+ rcs[i] = local_nb[j].rc;
+ len -= local_nb[j].len;
+ j++;
+ } while (len > 0);
+ LASSERT(len == 0);
+ }
+ LASSERT(j == npages);
+ ptlrpc_lprocfs_brw(req, nob);
+
+ tgt_drop_id(exp, &repbody->oa);
+ }
+out_lock:
+ tgt_brw_unlock(ioo, remote_nb, &lockh, LCK_PW);
+ if (desc)
+ ptlrpc_free_bulk_nopin(desc);
+out:
+ if (no_reply) {
+ req->rq_no_reply = 1;
+ /* reply out callback would free */
+ ptlrpc_req_drop_rs(req);
+ LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
+ "client will retry: rc %d\n",
+ exp->exp_obd->obd_name,
+ obd_uuid2str(&exp->exp_client_uuid),
+ obd_export_nid2str(exp), rc);
+ }
+ memory_pressure_clr();
+ RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_brw_write);
#define out_tx_destroy(info, obj, th, reply, idx) \
__out_tx_destroy(info, obj, th, reply, idx, __FILE__, __LINE__)
+extern struct page *tgt_page_to_corrupt;
+
+struct tgt_thread_big_cache {
+ struct niobuf_local local[PTLRPC_MAX_BRW_PAGES];
+};
+
#endif /* _TG_INTERNAL_H */
ted = &req->rq_export->exp_target_data;
lw_client = exp_connect_flags(req->rq_export) & OBD_CONNECT_LIGHTWEIGHT;
+ if (ted->ted_lr_idx < 0 && !lw_client)
+ /* ofd connect may cause transaction before export has
+ * last_rcvd slot */
+ RETURN(0);
tti->tti_transno = lustre_msg_get_transno(req->rq_reqmsg);
+
spin_lock(&tgt->lut_translock);
if (th->th_result != 0) {
if (tti->tti_transno != 0) {
* last_rcvd, we still want to maintain the in-memory
* lsd_client_data structure in order to properly handle reply
* reconstruction. */
- } else if (ted->ted_lr_off <= 0) {
+ } else if (ted->ted_lr_off == 0) {
CERROR("%s: client idx %d has offset %lld\n",
tgt_name(tgt), ted->ted_lr_idx, ted->ted_lr_off);
RETURN(-EINVAL);
}
EXPORT_SYMBOL(tgt_last_rcvd_update);
+/*
+ * last_rcvd update for echo client simulation.
+ * It updates last_rcvd client slot and version of object in
+ * simple way but with all locks to simulate all drawbacks
+ */
+int tgt_last_rcvd_update_echo(const struct lu_env *env, struct lu_target *tgt,
+ struct dt_object *obj, struct thandle *th,
+ struct obd_export *exp)
+{
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct tg_export_data *ted = &exp->exp_target_data;
+ int rc = 0;
+
+ ENTRY;
+
+ tti->tti_transno = 0;
+
+ spin_lock(&tgt->lut_translock);
+ if (th->th_result == 0)
+ tti->tti_transno = ++tgt->lut_last_transno;
+ spin_unlock(&tgt->lut_translock);
+
+ /** VBR: set new versions */
+ if (th->th_result == 0 && obj != NULL)
+ dt_version_set(env, obj, tti->tti_transno, th);
+
+ /* if can't add callback, do sync write */
+ th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
+ tti->tti_transno);
+
+ LASSERT(ted->ted_lr_off > 0);
+
+ mutex_lock(&ted->ted_lcd_lock);
+ LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+ ted->ted_lcd->lcd_last_transno = tti->tti_transno;
+ ted->ted_lcd->lcd_last_result = th->th_result;
+
+ tti->tti_off = ted->ted_lr_off;
+ rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+ mutex_unlock(&ted->ted_lcd_lock);
+ RETURN(rc);
+}
+EXPORT_SYMBOL(tgt_last_rcvd_update_echo);
lut->lut_mds_capa = 1;
lut->lut_oss_capa = 1;
+ spin_lock_init(&lut->lut_flags_lock);
+ lut->lut_sync_lock_cancel = NEVER_SYNC_ON_CANCEL;
+
/* last_rcvd initialization is needed by replayable targets only */
if (!obd->obd_replayable)
RETURN(0);
LU_KEY_INIT_GENERIC(tgt_ses);
+/*
+ * this page is allocated statically when module is initializing
+ * it is used to simulate data corruptions, see ost_checksum_bulk()
+ * for details. as the original pages provided by the layers below
+ * can be remain in the internal cache, we do not want to modify
+ * them.
+ */
+struct page *tgt_page_to_corrupt;
+
int tgt_mod_init(void)
{
ENTRY;
+ tgt_page_to_corrupt = alloc_page(GFP_IOFS);
+
tgt_key_init_generic(&tgt_thread_key, NULL);
lu_context_key_register_many(&tgt_thread_key, NULL);
void tgt_mod_exit(void)
{
+ if (tgt_page_to_corrupt != NULL)
+ page_cache_release(tgt_page_to_corrupt);
+
lu_context_key_degister(&tgt_thread_key);
lu_context_key_degister(&tgt_session_key);
}