+ cl_object_put(env, obj);
+ cl_env_put(env, &refcheck);
+ RETURN(0);
+}
+
+static int cl_echo_enqueue0(struct lu_env *env, struct echo_object *eco,
+ obd_off start, obd_off end, int mode,
+ __u64 *cookie , __u32 enqflags)
+{
+ struct cl_io *io;
+ struct cl_lock *lck;
+ struct cl_object *obj;
+ struct cl_lock_descr *descr;
+ struct echo_thread_info *info;
+ int rc = -ENOMEM;
+ ENTRY;
+
+ info = echo_env_info(env);
+ io = &info->eti_io;
+ descr = &info->eti_descr;
+ obj = echo_obj2cl(eco);
+
+ descr->cld_obj = obj;
+ descr->cld_start = cl_index(obj, start);
+ descr->cld_end = cl_index(obj, end);
+ descr->cld_mode = mode == LCK_PW ? CLM_WRITE : CLM_READ;
+ descr->cld_enq_flags = enqflags;
+ io->ci_obj = obj;
+
+ lck = cl_lock_request(env, io, descr, "ec enqueue", eco);
+ if (lck) {
+ struct echo_client_obd *ec = eco->eo_dev->ed_ec;
+ struct echo_lock *el;
+
+ rc = cl_wait(env, lck);
+ if (rc == 0) {
+ el = cl2echo_lock(cl_lock_at(lck, &echo_device_type));
+ cfs_spin_lock(&ec->ec_lock);
+ if (cfs_list_empty(&el->el_chain)) {
+ cfs_list_add(&el->el_chain, &ec->ec_locks);
+ el->el_cookie = ++ec->ec_unique;
+ }
+ cfs_atomic_inc(&el->el_refcount);
+ *cookie = el->el_cookie;
+ cfs_spin_unlock(&ec->ec_lock);
+ } else
+ cl_lock_release(env, lck, "ec enqueue", cfs_current());
+ }
+ RETURN(rc);
+}
+
+static int cl_echo_enqueue(struct echo_object *eco, obd_off start, obd_off end,
+ int mode, __u64 *cookie)
+{
+ struct echo_thread_info *info;
+ struct lu_env *env;
+ struct cl_io *io;
+ int refcheck;
+ int result;
+ ENTRY;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ info = echo_env_info(env);
+ io = &info->eti_io;
+
+ result = cl_io_init(env, io, CIT_MISC, echo_obj2cl(eco));
+ if (result < 0)
+ GOTO(out, result);
+ LASSERT(result == 0);
+
+ result = cl_echo_enqueue0(env, eco, start, end, mode, cookie, 0);
+ cl_io_fini(env, io);
+
+ EXIT;
+out:
+ cl_env_put(env, &refcheck);
+ return result;
+}
+
+static int cl_echo_cancel0(struct lu_env *env, struct echo_device *ed,
+ __u64 cookie)
+{
+ struct echo_client_obd *ec = ed->ed_ec;
+ struct echo_lock *ecl = NULL;
+ cfs_list_t *el;
+ int found = 0, still_used = 0;
+ ENTRY;
+
+ LASSERT(ec != NULL);
+ cfs_spin_lock (&ec->ec_lock);
+ cfs_list_for_each (el, &ec->ec_locks) {
+ ecl = cfs_list_entry (el, struct echo_lock, el_chain);
+ CDEBUG(D_INFO, "ecl: %p, cookie: "LPX64"\n", ecl, ecl->el_cookie);
+ found = (ecl->el_cookie == cookie);
+ if (found) {
+ if (cfs_atomic_dec_and_test(&ecl->el_refcount))
+ cfs_list_del_init(&ecl->el_chain);
+ else
+ still_used = 1;
+ break;
+ }
+ }
+ cfs_spin_unlock (&ec->ec_lock);
+
+ if (!found)
+ RETURN(-ENOENT);
+
+ echo_lock_release(env, ecl, still_used);
+ RETURN(0);
+}
+
+static int cl_echo_cancel(struct echo_device *ed, __u64 cookie)
+{
+ struct lu_env *env;
+ int refcheck;
+ int rc;
+ ENTRY;
+
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ rc = cl_echo_cancel0(env, ed, cookie);
+
+ cl_env_put(env, &refcheck);
+ RETURN(rc);
+}
+
+static int cl_echo_async_brw(const struct lu_env *env, struct cl_io *io,
+ enum cl_req_type unused, struct cl_2queue *queue)
+{
+ struct cl_page *clp;
+ struct cl_page *temp;
+ int result = 0;
+ ENTRY;
+
+ cl_page_list_for_each_safe(clp, temp, &queue->c2_qin) {
+ int rc;
+ rc = cl_page_cache_add(env, io, clp, CRT_WRITE);
+ if (rc == 0)
+ continue;
+ result = result ?: rc;
+ }
+ RETURN(result);
+}
+
+static int cl_echo_object_brw(struct echo_object *eco, int rw, obd_off offset,
+ cfs_page_t **pages, int npages, int async)
+{
+ struct lu_env *env;
+ struct echo_thread_info *info;
+ struct cl_object *obj = echo_obj2cl(eco);
+ struct echo_device *ed = eco->eo_dev;
+ struct cl_2queue *queue;
+ struct cl_io *io;
+ struct cl_page *clp;
+ struct lustre_handle lh = { 0 };
+ int page_size = cl_page_size(obj);
+ int refcheck;
+ int rc;
+ int i;
+ ENTRY;
+
+ LASSERT((offset & ~CFS_PAGE_MASK) == 0);
+ LASSERT(ed->ed_next != NULL);
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
+
+ info = echo_env_info(env);
+ io = &info->eti_io;
+ queue = &info->eti_queue;
+
+ cl_2queue_init(queue);
+ rc = cl_io_init(env, io, CIT_MISC, obj);
+ if (rc < 0)
+ GOTO(out, rc);
+ LASSERT(rc == 0);
+
+
+ rc = cl_echo_enqueue0(env, eco, offset,
+ offset + npages * CFS_PAGE_SIZE - 1,
+ rw == READ ? LCK_PR : LCK_PW, &lh.cookie,
+ CEF_NEVER);
+ if (rc < 0)
+ GOTO(error_lock, rc);
+
+ for (i = 0; i < npages; i++) {
+ LASSERT(pages[i]);
+ clp = cl_page_find(env, obj, cl_index(obj, offset),
+ pages[i], CPT_TRANSIENT);
+ if (IS_ERR(clp)) {
+ rc = PTR_ERR(clp);
+ break;
+ }
+ LASSERT(clp->cp_type == CPT_TRANSIENT);
+
+ rc = cl_page_own(env, io, clp);
+ if (rc) {
+ LASSERT(clp->cp_state == CPS_FREEING);
+ cl_page_put(env, clp);
+ break;
+ }
+
+ cl_2queue_add(queue, clp);
+
+ /* drop the reference count for cl_page_find, so that the page
+ * will be freed in cl_2queue_fini. */
+ cl_page_put(env, clp);
+ cl_page_clip(env, clp, 0, page_size);
+
+ offset += page_size;
+ }
+
+ if (rc == 0) {
+ enum cl_req_type typ = rw == READ ? CRT_READ : CRT_WRITE;
+
+ async = async && (typ == CRT_WRITE);
+ if (async)
+ rc = cl_echo_async_brw(env, io, typ, queue);
+ else
+ rc = cl_io_submit_sync(env, io, typ, queue,
+ CRP_NORMAL, 0);
+ CDEBUG(D_INFO, "echo_client %s write returns %d\n",
+ async ? "async" : "sync", rc);
+ }
+
+ cl_echo_cancel0(env, ed, lh.cookie);
+ EXIT;
+error_lock:
+ cl_2queue_discard(env, io, queue);
+ cl_2queue_disown(env, io, queue);
+ cl_2queue_fini(env, queue);
+ cl_io_fini(env, io);
+out:
+ cl_env_put(env, &refcheck);
+ return rc;
+}
+/** @} echo_exports */
+
+
+static obd_id last_object_id;
+
+static int
+echo_copyout_lsm (struct lov_stripe_md *lsm, void *_ulsm, int ulsm_nob)
+{
+ struct lov_stripe_md *ulsm = _ulsm;
+ int nob, i;
+
+ nob = offsetof (struct lov_stripe_md, lsm_oinfo[lsm->lsm_stripe_count]);
+ if (nob > ulsm_nob)
+ return (-EINVAL);
+
+ if (cfs_copy_to_user (ulsm, lsm, sizeof(ulsm)))
+ return (-EFAULT);
+
+ for (i = 0; i < lsm->lsm_stripe_count; i++) {
+ if (cfs_copy_to_user (ulsm->lsm_oinfo[i], lsm->lsm_oinfo[i],
+ sizeof(lsm->lsm_oinfo[0])))
+ return (-EFAULT);
+ }
+ return 0;
+}
+
+static int
+echo_copyin_lsm (struct echo_device *ed, struct lov_stripe_md *lsm,
+ void *ulsm, int ulsm_nob)
+{
+ struct echo_client_obd *ec = ed->ed_ec;
+ int i;
+
+ if (ulsm_nob < sizeof (*lsm))
+ return (-EINVAL);