From: Mikhail Pershin Date: Mon, 1 Oct 2012 16:44:41 +0000 (+0400) Subject: LU-1934 echo: echo client to work over OFD X-Git-Tag: 2.3.52~13 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=e068c2070c8d0d57b25b13afc83399b6d0b1e2d9 LU-1934 echo: echo client to work over OFD - echo client doesn't call obd_alloc/free_memmd() to underlaying device but implement own methods - Use real lu_env in the prep/commitrw calls - fill OBD_BRW_ASYNC flag for async writes Signed-off-by: Mikhail Pershin Change-Id: Ic02d4c2ad08693cd47d8dff5e8a86812a1534763 Reviewed-on: http://review.whamcloud.com/4146 Tested-by: Hudson Reviewed-by: Andreas Dilger Tested-by: Maloo Reviewed-by: Alex Zhuravlev Reviewed-by: wangdi Reviewed-by: Oleg Drokin --- diff --git a/lustre/obdecho/echo_client.c b/lustre/obdecho/echo_client.c index 5239db5..3ee8138 100644 --- a/lustre/obdecho/echo_client.c +++ b/lustre/obdecho/echo_client.c @@ -50,6 +50,7 @@ #include #include #include +#include #include "echo_internal.h" @@ -522,6 +523,56 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj, RETURN(0); } +/* taken from osc_unpackmd() */ +static int echo_alloc_memmd(struct echo_device *ed, + struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp == NULL); + OBD_ALLOC(*lsmp, lsm_size); + if (*lsmp == NULL) + RETURN(-ENOMEM); + + OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + if ((*lsmp)->lsm_oinfo[0] == NULL) { + OBD_FREE(*lsmp, lsm_size); + RETURN(-ENOMEM); + } + + loi_init((*lsmp)->lsm_oinfo[0]); + (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES; + + RETURN(lsm_size); +} + +static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp) +{ + int lsm_size; + + ENTRY; + + /* If export is lov/osc then use their obd method */ + if (ed->ed_next != NULL) + return obd_free_memmd(ed->ed_ec->ec_exp, lsmp); + /* OFD has no unpackmd method, do everything here */ + lsm_size = lov_stripe_md_size(1); + + LASSERT(*lsmp != NULL); + OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo)); + OBD_FREE(*lsmp, lsm_size); + *lsmp = NULL; + RETURN(0); +} + static void echo_object_free(const struct lu_env *env, struct lu_object *obj) { struct echo_object *eco = cl2echo_obj(lu2cl(obj)); @@ -538,7 +589,7 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj) lu_object_header_fini(obj->lo_header); if (eco->eo_lsm) - obd_free_memmd(ec->ec_exp, &eco->eo_lsm); + echo_free_memmd(eco->eo_dev, &eco->eo_lsm); OBD_SLAB_FREE_PTR(eco, echo_object_kmem); EXIT; } @@ -1820,14 +1871,6 @@ static int echo_md_destroy_internal(const struct lu_env *env, CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n", PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent); -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0) - /* After 2.4, MDT will send destroy RPC to OST directly, so no need - * this flag */ - ma->ma_valid |= MA_FLAGS; - ma->ma_attr_flags |= MDS_UNLINK_DESTROY; -#else -#warning "Please remove this after 2.4 (LOD/OSP)" -#endif rc = mdo_unlink(env, parent, lu2md(child), lname, ma); if (rc) { CERROR("Can not unlink child %s: rc = %d\n", @@ -2095,7 +2138,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) { CERROR("Cannot allocate md: rc = %d\n", rc); GOTO(failed, rc); @@ -2162,7 +2205,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, if (created && rc) obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); if (rc) CERROR("create object failed with: rc = %d\n", rc); return (rc); @@ -2171,7 +2214,6 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed, static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, struct obdo *oa) { - struct echo_client_obd *ec = ed->ed_ec; struct lov_stripe_md *lsm = NULL; struct echo_object *eco; int rc; @@ -2184,7 +2226,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, RETURN(-EINVAL); } - rc = obd_alloc_memmd(ec->ec_exp, &lsm); + rc = echo_alloc_memmd(ed, &lsm); if (rc < 0) RETURN(rc); @@ -2201,7 +2243,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed, else rc = PTR_ERR(eco); if (lsm) - obd_free_memmd(ec->ec_exp, &lsm); + echo_free_memmd(ed, &lsm); RETURN(rc); } @@ -2400,11 +2442,12 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa, RETURN(rc); } -static int echo_client_prep_commit(struct obd_export *exp, int rw, - struct obdo *oa, struct echo_object *eco, - obd_off offset, obd_size count, - obd_size batch, struct obd_trans_info *oti, - int async) +static int echo_client_prep_commit(const struct lu_env *env, + struct obd_export *exp, int rw, + struct obdo *oa, struct echo_object *eco, + obd_off offset, obd_size count, + obd_size batch, struct obd_trans_info *oti, + int async) { struct lov_stripe_md *lsm = eco->eo_lsm; struct obd_ioobj ioo; @@ -2412,7 +2455,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, struct niobuf_remote *rnb; obd_off off; obd_size npages, tot_pages; - int i, ret = 0; + int i, ret = 0, brw_flags = 0; + ENTRY; if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 || @@ -2428,6 +2472,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, if (lnb == NULL || rnb == NULL) GOTO(out, ret = -ENOMEM); + if (rw == OBD_BRW_WRITE && async) + brw_flags |= OBD_BRW_ASYNC; + obdo_to_ioobj(oa, &ioo); off = offset; @@ -2441,13 +2488,14 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) { rnb[i].offset = off; rnb[i].len = CFS_PAGE_SIZE; + rnb[i].flags = brw_flags; } ioo.ioo_bufcnt = npages; oti->oti_transno = 0; lpages = npages; - ret = obd_preprw(NULL, rw, exp, oa, 1, &ioo, rnb, &lpages, + ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages, lnb, oti, NULL); if (ret != 0) GOTO(out, ret); @@ -2480,8 +2528,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw, rnb[i].len); } - ret = obd_commitrw(NULL, rw, exp, oa, 1, &ioo, - rnb, npages, lnb, oti, ret); + ret = obd_commitrw(env, rw, exp, oa, 1, &ioo, + rnb, npages, lnb, oti, ret); if (ret != 0) GOTO(out, ret); @@ -2497,13 +2545,14 @@ out: RETURN(ret); } -static int echo_client_brw_ioctl(int rw, struct obd_export *exp, - struct obd_ioctl_data *data) +static int echo_client_brw_ioctl(const struct lu_env *env, int rw, + struct obd_export *exp, + struct obd_ioctl_data *data, + struct obd_trans_info *dummy_oti) { struct obd_device *obd = class_exp2obd(exp); struct echo_device *ed = obd2echo_dev(obd); struct echo_client_obd *ec = ed->ed_ec; - struct obd_trans_info dummy_oti = { 0 }; struct obdo *oa = &data->ioc_obdo1; struct echo_object *eco; int rc; @@ -2519,7 +2568,7 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, oa->o_valid &= ~OBD_MD_FLHANDLE; - /* obdfilter doesn't support obd_brw now, simulate via prep + commit */ + /* OFD/obdfilter works only via prep/commit */ test_mode = (long)data->ioc_pbuf1; if (test_mode == 1) async = 0; @@ -2539,13 +2588,13 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp, case 2: rc = echo_client_kbrw(ed, rw, oa, eco, data->ioc_offset, - data->ioc_count, async, &dummy_oti); + data->ioc_count, async, dummy_oti); break; case 3: - rc = echo_client_prep_commit(ec->ec_exp, rw, oa, - eco, data->ioc_offset, - data->ioc_count, data->ioc_plen1, - &dummy_oti, async); + rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa, + eco, data->ioc_offset, + data->ioc_count, data->ioc_plen1, + dummy_oti, async); break; default: rc = -EINVAL; @@ -2764,7 +2813,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len, rw = OBD_BRW_WRITE; /* fall through */ case OBD_IOC_BRW_READ: - rc = echo_client_brw_ioctl(rw, exp, data); + rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti); GOTO(out, rc); case ECHO_IOC_GET_STRIPE: