Whamcloud - gitweb
LU-1934 echo: echo client to work over OFD
authorMikhail Pershin <tappro@whamcloud.com>
Mon, 1 Oct 2012 16:44:41 +0000 (20:44 +0400)
committerOleg Drokin <green@whamcloud.com>
Wed, 3 Oct 2012 04:33:20 +0000 (00:33 -0400)
- echo client doesn't call obd_alloc/free_memmd() to underlaying
  device but implement own methods
- Use real lu_env in the prep/commitrw calls
- fill OBD_BRW_ASYNC flag for async writes

Signed-off-by: Mikhail Pershin <tappro@whamcloud.com>
Change-Id: Ic02d4c2ad08693cd47d8dff5e8a86812a1534763
Reviewed-on: http://review.whamcloud.com/4146
Tested-by: Hudson
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Tested-by: Maloo <whamcloud.maloo@gmail.com>
Reviewed-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-by: wangdi <di.wang@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/obdecho/echo_client.c

index 5239db5..3ee8138 100644 (file)
@@ -50,6 +50,7 @@
 #include <lustre_fid.h>
 #include <lustre_acl.h>
 #include <lustre_net.h>
+#include <obd_lov.h>
 
 #include "echo_internal.h"
 
@@ -522,6 +523,56 @@ static int echo_object_init(const struct lu_env *env, struct lu_object *obj,
         RETURN(0);
 }
 
+/* taken from osc_unpackmd() */
+static int echo_alloc_memmd(struct echo_device *ed,
+                           struct lov_stripe_md **lsmp)
+{
+       int lsm_size;
+
+       ENTRY;
+
+       /* If export is lov/osc then use their obd method */
+       if (ed->ed_next != NULL)
+               return obd_alloc_memmd(ed->ed_ec->ec_exp, lsmp);
+       /* OFD has no unpackmd method, do everything here */
+       lsm_size = lov_stripe_md_size(1);
+
+       LASSERT(*lsmp == NULL);
+       OBD_ALLOC(*lsmp, lsm_size);
+       if (*lsmp == NULL)
+               RETURN(-ENOMEM);
+
+       OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+       if ((*lsmp)->lsm_oinfo[0] == NULL) {
+               OBD_FREE(*lsmp, lsm_size);
+               RETURN(-ENOMEM);
+       }
+
+       loi_init((*lsmp)->lsm_oinfo[0]);
+       (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+
+       RETURN(lsm_size);
+}
+
+static int echo_free_memmd(struct echo_device *ed, struct lov_stripe_md **lsmp)
+{
+       int lsm_size;
+
+       ENTRY;
+
+       /* If export is lov/osc then use their obd method */
+       if (ed->ed_next != NULL)
+               return obd_free_memmd(ed->ed_ec->ec_exp, lsmp);
+       /* OFD has no unpackmd method, do everything here */
+       lsm_size = lov_stripe_md_size(1);
+
+       LASSERT(*lsmp != NULL);
+       OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
+       OBD_FREE(*lsmp, lsm_size);
+       *lsmp = NULL;
+       RETURN(0);
+}
+
 static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
 {
         struct echo_object *eco    = cl2echo_obj(lu2cl(obj));
@@ -538,7 +589,7 @@ static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
         lu_object_header_fini(obj->lo_header);
 
        if (eco->eo_lsm)
-               obd_free_memmd(ec->ec_exp, &eco->eo_lsm);
+               echo_free_memmd(eco->eo_dev, &eco->eo_lsm);
        OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
        EXIT;
 }
@@ -1820,14 +1871,6 @@ static int echo_md_destroy_internal(const struct lu_env *env,
         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
-#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 3, 55, 0)
-        /* After 2.4, MDT will send destroy RPC to OST directly, so no need
-         * this flag */
-        ma->ma_valid |= MA_FLAGS;
-        ma->ma_attr_flags |= MDS_UNLINK_DESTROY;
-#else
-#warning "Please remove this after 2.4 (LOD/OSP)"
-#endif
         rc = mdo_unlink(env, parent, lu2md(child), lname, ma);
         if (rc) {
                 CERROR("Can not unlink child %s: rc = %d\n",
@@ -2095,7 +2138,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
                 RETURN(-EINVAL);
         }
 
-        rc = obd_alloc_memmd(ec->ec_exp, &lsm);
+       rc = echo_alloc_memmd(ed, &lsm);
         if (rc < 0) {
                 CERROR("Cannot allocate md: rc = %d\n", rc);
                 GOTO(failed, rc);
@@ -2162,7 +2205,7 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
         if (created && rc)
                 obd_destroy(env, ec->ec_exp, oa, lsm, oti, NULL, NULL);
         if (lsm)
-                obd_free_memmd(ec->ec_exp, &lsm);
+               echo_free_memmd(ed, &lsm);
         if (rc)
                 CERROR("create object failed with: rc = %d\n", rc);
         return (rc);
@@ -2171,7 +2214,6 @@ static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
 static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
                            struct obdo *oa)
 {
-        struct echo_client_obd *ec  = ed->ed_ec;
         struct lov_stripe_md   *lsm = NULL;
         struct echo_object     *eco;
         int                     rc;
@@ -2184,7 +2226,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
                 RETURN(-EINVAL);
         }
 
-        rc = obd_alloc_memmd(ec->ec_exp, &lsm);
+       rc = echo_alloc_memmd(ed, &lsm);
         if (rc < 0)
                 RETURN(rc);
 
@@ -2201,7 +2243,7 @@ static int echo_get_object(struct echo_object **ecop, struct echo_device *ed,
         else
                 rc = PTR_ERR(eco);
         if (lsm)
-                obd_free_memmd(ec->ec_exp, &lsm);
+               echo_free_memmd(ed, &lsm);
         RETURN(rc);
 }
 
@@ -2400,11 +2442,12 @@ static int echo_client_kbrw(struct echo_device *ed, int rw, struct obdo *oa,
         RETURN(rc);
 }
 
-static int echo_client_prep_commit(struct obd_export *exp, int rw,
-                                   struct obdo *oa, struct echo_object *eco,
-                                   obd_off offset, obd_size count,
-                                   obd_size batch, struct obd_trans_info *oti,
-                                   int async)
+static int echo_client_prep_commit(const struct lu_env *env,
+                                  struct obd_export *exp, int rw,
+                                  struct obdo *oa, struct echo_object *eco,
+                                  obd_off offset, obd_size count,
+                                  obd_size batch, struct obd_trans_info *oti,
+                                  int async)
 {
         struct lov_stripe_md *lsm = eco->eo_lsm;
         struct obd_ioobj ioo;
@@ -2412,7 +2455,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         struct niobuf_remote *rnb;
         obd_off off;
         obd_size npages, tot_pages;
-        int i, ret = 0;
+       int i, ret = 0, brw_flags = 0;
+
         ENTRY;
 
         if (count <= 0 || (count & (~CFS_PAGE_MASK)) != 0 ||
@@ -2428,6 +2472,9 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
         if (lnb == NULL || rnb == NULL)
                 GOTO(out, ret = -ENOMEM);
 
+       if (rw == OBD_BRW_WRITE && async)
+               brw_flags |= OBD_BRW_ASYNC;
+
         obdo_to_ioobj(oa, &ioo);
 
         off = offset;
@@ -2441,13 +2488,14 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                 for (i = 0; i < npages; i++, off += CFS_PAGE_SIZE) {
                         rnb[i].offset = off;
                         rnb[i].len = CFS_PAGE_SIZE;
+                       rnb[i].flags = brw_flags;
                 }
 
                 ioo.ioo_bufcnt = npages;
                 oti->oti_transno = 0;
 
                 lpages = npages;
-                ret = obd_preprw(NULL, rw, exp, oa, 1, &ioo, rnb, &lpages,
+               ret = obd_preprw(env, rw, exp, oa, 1, &ioo, rnb, &lpages,
                                  lnb, oti, NULL);
                 if (ret != 0)
                         GOTO(out, ret);
@@ -2480,8 +2528,8 @@ static int echo_client_prep_commit(struct obd_export *exp, int rw,
                                                              rnb[i].len);
                 }
 
-                ret = obd_commitrw(NULL, rw, exp, oa, 1, &ioo,
-                                   rnb, npages, lnb, oti, ret);
+               ret = obd_commitrw(env, rw, exp, oa, 1, &ioo,
+                                  rnb, npages, lnb, oti, ret);
                 if (ret != 0)
                         GOTO(out, ret);
 
@@ -2497,13 +2545,14 @@ out:
         RETURN(ret);
 }
 
-static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
-                                 struct obd_ioctl_data *data)
+static int echo_client_brw_ioctl(const struct lu_env *env, int rw,
+                                struct obd_export *exp,
+                                struct obd_ioctl_data *data,
+                                struct obd_trans_info *dummy_oti)
 {
         struct obd_device *obd = class_exp2obd(exp);
         struct echo_device *ed = obd2echo_dev(obd);
         struct echo_client_obd *ec = ed->ed_ec;
-        struct obd_trans_info dummy_oti = { 0 };
         struct obdo *oa = &data->ioc_obdo1;
         struct echo_object *eco;
         int rc;
@@ -2519,7 +2568,7 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
 
         oa->o_valid &= ~OBD_MD_FLHANDLE;
 
-        /* obdfilter doesn't support obd_brw now, simulate via prep + commit */
+       /* OFD/obdfilter works only via prep/commit */
         test_mode = (long)data->ioc_pbuf1;
         if (test_mode == 1)
                 async = 0;
@@ -2539,13 +2588,13 @@ static int echo_client_brw_ioctl(int rw, struct obd_export *exp,
         case 2:
                 rc = echo_client_kbrw(ed, rw, oa,
                                       eco, data->ioc_offset,
-                                      data->ioc_count, async, &dummy_oti);
+                                     data->ioc_count, async, dummy_oti);
                 break;
         case 3:
-                rc = echo_client_prep_commit(ec->ec_exp, rw, oa,
-                                            eco, data->ioc_offset,
-                                            data->ioc_count, data->ioc_plen1,
-                                            &dummy_oti, async);
+               rc = echo_client_prep_commit(env, ec->ec_exp, rw, oa,
+                                            eco, data->ioc_offset,
+                                            data->ioc_count, data->ioc_plen1,
+                                            dummy_oti, async);
                 break;
         default:
                 rc = -EINVAL;
@@ -2764,7 +2813,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                 rw = OBD_BRW_WRITE;
                 /* fall through */
         case OBD_IOC_BRW_READ:
-                rc = echo_client_brw_ioctl(rw, exp, data);
+               rc = echo_client_brw_ioctl(env, rw, exp, data, &dummy_oti);
                 GOTO(out, rc);
 
         case ECHO_IOC_GET_STRIPE: