Whamcloud - gitweb
LU-4088 obdecho: Copy entire lsm, not just pointer size
[fs/lustre-release.git] / lustre / obdecho / echo_client.c
index b4344ec..0b22468 100644 (file)
@@ -189,25 +189,27 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
 static struct echo_thread_info *echo_env_info(const struct lu_env *env);
 
 struct echo_thread_info {
-        struct echo_object_conf eti_conf;
-        struct lustre_md        eti_md;
+       struct echo_object_conf eti_conf;
+       struct lustre_md        eti_md;
 
-        struct cl_2queue        eti_queue;
-        struct cl_io            eti_io;
-        struct cl_lock_descr    eti_descr;
-        struct lu_fid           eti_fid;
+       struct cl_2queue        eti_queue;
+       struct cl_io            eti_io;
+       struct cl_lock_descr    eti_descr;
+       struct lu_fid           eti_fid;
        struct lu_fid           eti_fid2;
-        struct md_op_spec       eti_spec;
-        struct lov_mds_md_v3    eti_lmm;
-        struct lov_user_md_v3   eti_lum;
-        struct md_attr          eti_ma;
-        struct lu_name          eti_lname;
+#ifdef HAVE_SERVER_SUPPORT
+       struct md_op_spec       eti_spec;
+       struct lov_mds_md_v3    eti_lmm;
+       struct lov_user_md_v3   eti_lum;
+       struct md_attr          eti_ma;
+       struct lu_name          eti_lname;
        /* per-thread values, can be re-used */
        void                    *eti_big_lmm;
        int                     eti_big_lmmsize;
-        char                    eti_name[20];
-        struct lu_buf           eti_buf;
-        char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+       char                    eti_name[20];
+       struct lu_buf           eti_buf;
+       char                    eti_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE];
+#endif
 };
 
 /* No session used right now */
@@ -734,7 +736,8 @@ static struct lu_context_key echo_session_key = {
 
 LU_TYPE_INIT_FINI(echo, &echo_thread_key, &echo_session_key);
 
-#define ECHO_SEQ_WIDTH 0xffffffff
+#ifdef HAVE_SERVER_SUPPORT
+# define ECHO_SEQ_WIDTH 0xffffffff
 static int echo_fid_init(struct echo_device *ed, char *obd_name,
                         struct seq_server_site *ss)
 {
@@ -782,6 +785,7 @@ static int echo_fid_fini(struct obd_device *obddev)
 
         RETURN(0);
 }
+#endif /* HAVE_SERVER_SUPPORT */
 
 static struct lu_device *echo_device_alloc(const struct lu_env *env,
                                            struct lu_device_type *t,
@@ -841,6 +845,7 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
         cleanup = 4;
 
         if (ed->ed_next_ismd) {
+#ifdef HAVE_SERVER_SUPPORT
                 /* Suppose to connect to some Metadata layer */
                 struct lu_site *ls;
                 struct lu_device *ld;
@@ -887,6 +892,12 @@ static struct lu_device *echo_device_alloc(const struct lu_env *env,
                        CERROR("echo fid init error %d\n", rc);
                        GOTO(out, rc);
                }
+#else /* !HAVE_SERVER_SUPPORT */
+               CERROR("Local operations are NOT supported on client side. "
+                      "Only remote operations are supported. Metadata client "
+                      "must be run on server side.\n");
+               GOTO(out, rc = -EOPNOTSUPP);
+#endif
         } else {
                  /* if echo client is to be stacked upon ost device, the next is
                   * NULL since ost is not a clio device so far */
@@ -1023,12 +1034,14 @@ static struct lu_device *echo_device_free(const struct lu_env *env,
 
         LASSERT(cfs_list_empty(&ec->ec_locks));
 
-        CDEBUG(D_INFO, "No object exists, exiting...\n");
+       CDEBUG(D_INFO, "No object exists, exiting...\n");
 
-        echo_client_cleanup(d->ld_obd);
-        echo_fid_fini(d->ld_obd);
-        while (next && !ed->ed_next_ismd)
-                next = next->ld_type->ldt_ops->ldto_device_free(env, next);
+       echo_client_cleanup(d->ld_obd);
+#ifdef HAVE_SERVER_SUPPORT
+       echo_fid_fini(d->ld_obd);
+#endif
+       while (next && !ed->ed_next_ismd)
+               next = next->ld_type->ldt_ops->ldto_device_free(env, next);
 
         LASSERT(ed->ed_site == lu2cl_site(d->ld_site));
         echo_site_fini(env, ed);
@@ -1288,22 +1301,17 @@ static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
         RETURN(rc);
 }
 
-static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
-                             enum cl_req_type unused, struct cl_2queue *queue)
+static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
+                               struct cl_page *page)
 {
-        struct cl_page *clp;
-        struct cl_page *temp;
-        int result = 0;
-        ENTRY;
+       struct echo_thread_info *info;
+       struct cl_2queue        *queue;
 
-        cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
-                int rc;
-                rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
-                if (rc == 0)
-                        continue;
-                result = result ?: rc;
-        }
-        RETURN(result);
+       info = echo_env_info(env);
+       LASSERT(io == &info->eti_io);
+
+       queue = &info->eti_queue;
+       cl_page_list_add(&queue->c2_qout, page);
 }
 
 static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
@@ -1381,8 +1389,10 @@ static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
 
                 async = async && (typ == CRT_WRITE);
                 if (async)
-                        rc = cl_echo_async_brw(env, io, typ, queue);
-                else
+                       rc = cl_io_commit_async(env, io, &queue->c2_qin,
+                                               0, PAGE_SIZE,
+                                               echo_commit_callback);
+               else
                        rc = cl_io_submit_sync(env, io, typ, queue, 0);
                 CDEBUG(D_INFO, "echo_client %s write returns %d\n",
                        async ? "async" : "sync", rc);
@@ -1414,7 +1424,7 @@ echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
         if (nob > ulsm_nob)
                 return (-EINVAL);
 
-       if (copy_to_user (ulsm, lsm, sizeof(ulsm)))
+       if (copy_to_user (ulsm, lsm, sizeof(*ulsm)))
                 return (-EFAULT);
 
         for (i = 0; i < lsm->lsm_stripe_count; i++) {
@@ -1455,6 +1465,7 @@ echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
         return (0);
 }
 
+#ifdef HAVE_SERVER_SUPPORT
 static inline void echo_md_build_name(struct lu_name *lname, char *name,
                                      __u64 id)
 {
@@ -1949,6 +1960,11 @@ static int echo_md_destroy_internal(const struct lu_env *env,
                 GOTO(out_put, rc = -EINVAL);
         }
 
+       if (lu_object_remote(child)) {
+               CERROR("Can not destroy remote object %s: rc = %d\n",
+                      lname->ln_name, -EPERM);
+               GOTO(out_put, rc = -EPERM);
+       }
         CDEBUG(D_RPCTRACE, "Start destroy object "DFID" %s %p\n",
                PFID(lu_object_fid(&parent->mo_lu)), lname->ln_name, parent);
 
@@ -2119,7 +2135,7 @@ static void echo_ucred_fini(struct lu_env *env)
 }
 
 #define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
-#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION)
+#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
 static int echo_md_handler(struct echo_device *ed, int command,
                           char *path, int path_len, __u64 id, int count,
                           struct obd_ioctl_data *data)
@@ -2236,6 +2252,7 @@ out_env:
         cl_env_put(env, &refcheck);
         return rc;
 }
+#endif /* HAVE_SERVER_SUPPORT */
 
 static int echo_create_object(const struct lu_env *env, struct echo_device *ed,
                               int on_target, struct obdo *oa, void *ulsm,
@@ -2777,6 +2794,9 @@ static int
 echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                       void *karg, void *uarg)
 {
+#ifdef HAVE_SERVER_SUPPORT
+       struct tgt_session_info *tsi;
+#endif
         struct obd_device      *obd = exp->exp_obd;
         struct echo_device     *ed = obd2echo_dev(obd);
         struct echo_client_obd *ec = ed->ed_ec;
@@ -2790,6 +2810,9 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         int                     rw = OBD_BRW_READ;
         int                     rc = 0;
         int                     i;
+#ifdef HAVE_SERVER_SUPPORT
+       struct lu_context        echo_session;
+#endif
         ENTRY;
 
         memset(&dummy_oti, 0, sizeof(dummy_oti));
@@ -2809,10 +2832,20 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
         if (env == NULL)
                 RETURN(-ENOMEM);
 
-        rc = lu_env_init(env, LCT_DT_THREAD | LCT_MD_THREAD);
-        if (rc)
-                GOTO(out, rc = -ENOMEM);
+       rc = lu_env_init(env, LCT_DT_THREAD);
+       if (rc)
+               GOTO(out_alloc, rc = -ENOMEM);
 
+#ifdef HAVE_SERVER_SUPPORT
+       env->le_ses = &echo_session;
+       rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
+       if (unlikely(rc < 0))
+               GOTO(out_env, rc);
+       lu_context_enter(env->le_ses);
+
+       tsi = tgt_ses_info(env);
+       tsi->tsi_exp = ec->ec_exp;
+#endif
         switch (cmd) {
         case OBD_IOC_CREATE:                    /* may create echo object */
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
@@ -2822,6 +2855,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                                         data->ioc_plen1, &dummy_oti);
                 GOTO(out, rc);
 
+#ifdef HAVE_SERVER_SUPPORT
        case OBD_IOC_ECHO_MD: {
                int count;
                int cmd;
@@ -2835,8 +2869,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                count = data->ioc_count;
                cmd = data->ioc_command;
 
-               id = ostid_id(&data->ioc_obdo2.o_oi);
-
+               id = data->ioc_obdo2.o_oi.oi.oi_id;
                dirlen = data->ioc_plen1;
                OBD_ALLOC(dir, dirlen + 1);
                if (dir == NULL)
@@ -2888,6 +2921,7 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
                        return -EFAULT;
                GOTO(out, rc);
         }
+#endif /* HAVE_SERVER_SUPPORT */
         case OBD_IOC_DESTROY:
                 if (!cfs_capable(CFS_CAP_SYS_ADMIN))
                         GOTO (out, rc = -EPERM);
@@ -2985,7 +3019,13 @@ echo_client_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
 
         EXIT;
 out:
+#ifdef HAVE_SERVER_SUPPORT
+       lu_context_exit(env->le_ses);
+       lu_context_fini(env->le_ses);
+out_env:
+#endif
         lu_env_fini(env);
+out_alloc:
         OBD_FREE_PTR(env);
 
         /* XXX this should be in a helper also called by target_send_reply */
@@ -3027,11 +3067,17 @@ static int echo_client_setup(const struct lu_env *env,
         ec->ec_unique = 0;
         ec->ec_nstripes = 0;
 
-        if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
-                lu_context_tags_update(ECHO_MD_CTX_TAG);
-                lu_session_tags_update(ECHO_MD_SES_TAG);
-                RETURN(0);
-        }
+       if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
+#ifdef HAVE_SERVER_SUPPORT
+               lu_context_tags_update(ECHO_MD_CTX_TAG);
+               lu_session_tags_update(ECHO_MD_SES_TAG);
+#else
+               CERROR("Local operations are NOT supported on client side. "
+                      "Only remote operations are supported. Metadata client "
+                      "must be run on server side.\n");
+#endif
+               RETURN(0);
+       }
 
         OBD_ALLOC(ocd, sizeof(*ocd));
         if (ocd == NULL) {
@@ -3080,8 +3126,13 @@ static int echo_client_cleanup(struct obd_device *obddev)
                 RETURN(0);
 
         if (ed->ed_next_ismd) {
-                lu_context_tags_clear(ECHO_MD_CTX_TAG);
-                lu_session_tags_clear(ECHO_MD_SES_TAG);
+#ifdef HAVE_SERVER_SUPPORT
+               lu_context_tags_clear(ECHO_MD_CTX_TAG);
+               lu_session_tags_clear(ECHO_MD_SES_TAG);
+#else
+               CERROR("This is client-side only module, does not support "
+                       "metadata echo client.\n");
+#endif
                 RETURN(0);
         }
 
@@ -3175,16 +3226,18 @@ int echo_client_init(void)
 
         lprocfs_echo_init_vars(&lvars);
 
-        rc = lu_kmem_init(echo_caches);
-        if (rc == 0) {
-                rc = class_register_type(&echo_client_obd_ops, NULL,
-                                         lvars.module_vars,
-                                         LUSTRE_ECHO_CLIENT_NAME,
-                                         &echo_device_type);
-                if (rc)
-                        lu_kmem_fini(echo_caches);
-        }
-        return rc;
+       rc = lu_kmem_init(echo_caches);
+       if (rc == 0) {
+               rc = class_register_type(&echo_client_obd_ops, NULL, NULL,
+#ifndef HAVE_ONLY_PROCFS_SEQ
+                                       lvars.module_vars,
+#endif
+                                       LUSTRE_ECHO_CLIENT_NAME,
+                                       &echo_device_type);
+               if (rc)
+                       lu_kmem_fini(echo_caches);
+       }
+       return rc;
 }
 
 void echo_client_exit(void)
@@ -3211,10 +3264,13 @@ static int __init obdecho_init(void)
         if (rc != 0)
                 goto failed_0;
 
-        rc = class_register_type(&echo_obd_ops, NULL, lvars.module_vars,
-                                 LUSTRE_ECHO_NAME, NULL);
-        if (rc != 0)
-                goto failed_1;
+       rc = class_register_type(&echo_obd_ops, NULL, NULL,
+#ifndef HAVE_ONLY_PROCFS_SEQ
+                               lvars.module_vars,
+#endif
+                               LUSTRE_ECHO_NAME, NULL);
+       if (rc != 0)
+               goto failed_1;
 # endif
 
         rc = echo_client_init();