Whamcloud - gitweb
LU-2675 build: remove liblustre and libsysio
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
diff --git a/lustre/liblustre/llite_cl.c b/lustre/liblustre/llite_cl.c
deleted file mode 100644 (file)
index e4ed309..0000000
+++ /dev/null
@@ -1,769 +0,0 @@
-/*
- *   Copyright (c) 2007 Cluster File Systems, Inc.
- *   Author: Nikita Danilov <nikita@clusterfs.com>
- *
- *   This file is part of Lustre, http://www.lustre.org.
- *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
- *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
- *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
- *
- *   Copyright (c) 2011, 2013, Intel Corporation.
- */
-
-#define DEBUG_SUBSYSTEM S_LLITE
-
-#include <errno.h>
-#include <stdarg.h>
-#include <stddef.h>
-#include <sys/stat.h>
-#include <sys/types.h>
-#include <sys/uio.h>
-#include <libcfs/libcfs.h>
-#include <lustre/lustre_idl.h>
-#include <liblustre.h>
-#include <lclient.h>
-#include <cl_object.h>
-#include <lustre_export.h>
-#include <lustre_lite.h>
-#include <obd.h>
-#include <obd_support.h>
-#include "llite_lib.h"
-
-/*
- * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
- * "llu_" prefix.
- */
-
-static int   slp_type_init     (struct lu_device_type *t);
-static void  slp_type_fini     (struct lu_device_type *t);
-
-static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
-                        struct cl_page *page, pgoff_t index);
-static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
-                               struct cl_attr *attr);
-
-static struct lu_device  *slp_device_alloc(const struct lu_env *env,
-                                           struct lu_device_type *t,
-                                           struct lustre_cfg *cfg);
-
-static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_io *io);
-static struct slp_io *cl2slp_io(const struct lu_env *env,
-                                const struct cl_io_slice *slice);
-
-
-static void llu_free_user_page(struct page *page);
-
-static const struct lu_object_operations      slp_lu_obj_ops;
-static const struct lu_device_operations      slp_lu_ops;
-static const struct cl_device_operations      slp_cl_ops;
-static const struct cl_io_operations          ccc_io_ops;
-static const struct lu_device_type_operations slp_device_type_ops;
-             //struct lu_device_type            slp_device_type;
-static const struct cl_page_operations        slp_transient_page_ops;
-static const struct cl_lock_operations        slp_lock_ops;
-
-
-/*****************************************************************************
- *
- * Slp device and device type functions.
- *
- */
-
-static void *slp_session_key_init(const struct lu_context *ctx,
-                                 struct lu_context_key *key)
-{
-        struct slp_session *session;
-
-        OBD_ALLOC_PTR(session);
-        if (session == NULL)
-                session = ERR_PTR(-ENOMEM);
-        return session;
-}
-
-static void slp_session_key_fini(const struct lu_context *ctx,
-                                struct lu_context_key *key, void *data)
-{
-        struct slp_session *session = data;
-        OBD_FREE_PTR(session);
-}
-
-struct lu_context_key slp_session_key = {
-        .lct_tags = LCT_SESSION,
-        .lct_init = slp_session_key_init,
-        .lct_fini = slp_session_key_fini
-};
-
-/* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
-LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
-
-static struct lu_device *slp_device_alloc(const struct lu_env *env,
-                                          struct lu_device_type *t,
-                                          struct lustre_cfg *cfg)
-{
-        return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
-}
-
-static int slp_lock_init(const struct lu_env *env,
-                         struct cl_object *obj, struct cl_lock *lock,
-                         const struct cl_io *io)
-{
-        return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
-}
-
-static const struct cl_object_operations slp_ops = {
-        .coo_page_init = slp_page_init,
-        .coo_lock_init = slp_lock_init,
-        .coo_io_init   = slp_io_init,
-        .coo_attr_get  = slp_attr_get,
-        .coo_attr_set  = ccc_attr_set,
-        .coo_conf_set  = ccc_conf_set,
-        .coo_glimpse   = ccc_object_glimpse
-};
-
-static int slp_object_print(const struct lu_env *env, void *cookie,
-                            lu_printer_t p, const struct lu_object *o)
-{
-        struct ccc_object *obj   = lu2ccc(o);
-        struct inode      *inode = obj->cob_inode;
-        struct intnl_stat *st = NULL;
-
-        if (inode)
-                st = llu_i2stat(inode);
-
-        return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
-                    obj, inode,
-                    st ? (unsigned long)st->st_ino : 0UL,
-                    inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
-                    : 0);
-}
-
-static const struct lu_object_operations slp_lu_obj_ops = {
-        .loo_object_init      = ccc_object_init,
-        .loo_object_start     = NULL,
-        .loo_object_delete    = NULL,
-        .loo_object_release   = NULL,
-        .loo_object_free      = ccc_object_free,
-        .loo_object_print     = slp_object_print,
-        .loo_object_invariant = NULL
-};
-
-static struct lu_object *slp_object_alloc(const struct lu_env *env,
-                                          const struct lu_object_header *hdr,
-                                          struct lu_device *dev)
-{
-        return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
-}
-
-static const struct lu_device_operations slp_lu_ops = {
-        .ldo_object_alloc      = slp_object_alloc
-};
-
-static const struct cl_device_operations slp_cl_ops = {
-        .cdo_req_init = ccc_req_init
-};
-
-static const struct lu_device_type_operations slp_device_type_ops = {
-        .ldto_init = slp_type_init,
-        .ldto_fini = slp_type_fini,
-
-        .ldto_start = slp_type_start,
-        .ldto_stop  = slp_type_stop,
-
-        .ldto_device_alloc = slp_device_alloc,
-        .ldto_device_free  = ccc_device_free,
-        .ldto_device_init  = ccc_device_init,
-        .ldto_device_fini  = ccc_device_fini
-};
-
-static struct lu_device_type slp_device_type = {
-        .ldt_tags     = LU_DEVICE_CL,
-        .ldt_name     = LUSTRE_SLP_NAME,
-        .ldt_ops      = &slp_device_type_ops,
-        .ldt_ctx_tags = LCT_CL_THREAD
-};
-
-int slp_global_init(void)
-{
-        int result;
-
-        result = ccc_global_init(&slp_device_type);
-        return result;
-}
-
-void slp_global_fini(void)
-{
-        ccc_global_fini(&slp_device_type);
-}
-
-/*****************************************************************************
- *
- * Object operations.
- *
- */
-
-static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_page *page, pgoff_t index)
-{
-       struct ccc_page *cpg = cl_object_page_slice(obj, page);
-
-       CLOBINVRNT(env, obj, ccc_object_invariant(obj));
-
-       cpg->cpg_page = page->cp_vmpage;
-
-       if (page->cp_type == CPT_CACHEABLE) {
-               LBUG();
-       } else {
-               struct ccc_object *clobj = cl2ccc(obj);
-
-               cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
-                                 &slp_transient_page_ops);
-               clobj->cob_transient_pages++;
-       }
-
-       return 0;
-}
-
-static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
-                       struct cl_io *io)
-{
-        struct ccc_io      *vio   = ccc_env_io(env);
-        int result = 0;
-
-        CLOBINVRNT(env, obj, ccc_object_invariant(obj));
-
-        cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
-        if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
-                size_t count;
-
-                count = io->u.ci_rw.crw_count;
-                /* "If nbyte is 0, read() will return 0 and have no other
-                 *  results."  -- Single Unix Spec */
-                if (count == 0)
-                        result = 1;
-                else {
-                        vio->cui_tot_count = count;
-                        vio->cui_tot_nrsegs = 0;
-                }
-
-        }
-        return result;
-}
-
-static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
-                        struct cl_attr *attr)
-{
-        struct inode *inode = ccc_object_inode(obj);
-        struct intnl_stat *st = llu_i2stat(inode);
-
-        attr->cat_size = st->st_size;
-        attr->cat_blocks = st->st_blocks;
-        attr->cat_mtime  = st->st_mtime;
-        attr->cat_atime  = st->st_atime;
-        attr->cat_ctime  = st->st_ctime;
-        /* KMS is not known by this layer */
-        return 0; /* layers below have to fill in the rest */
-}
-
-/*****************************************************************************
- *
- * Page operations.
- *
- */
-
-static void slp_page_fini_common(struct ccc_page *cp)
-{
-       struct page *vmpage = cp->cpg_page;
-
-        LASSERT(vmpage != NULL);
-        llu_free_user_page(vmpage);
-        OBD_FREE_PTR(cp);
-}
-
-static void slp_page_completion_common(const struct lu_env *env,
-                                       struct ccc_page *cp, int ioret)
-{
-        LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
-}
-
-static void slp_page_completion_read(const struct lu_env *env,
-                                     const struct cl_page_slice *slice,
-                                     int ioret)
-{
-        struct ccc_page *cp      = cl2ccc_page(slice);
-        ENTRY;
-
-        slp_page_completion_common(env, cp, ioret);
-
-        EXIT;
-}
-
-static void slp_page_completion_write_common(const struct lu_env *env,
-                                             const struct cl_page_slice *slice,
-                                             int ioret)
-{
-        struct ccc_page *cp     = cl2ccc_page(slice);
-
-        if (ioret == 0) {
-                cp->cpg_write_queued = 0;
-                /*
-                 * Only ioret == 0, write succeed, then this page could be
-                 * deleted from the pending_writing count.
-                 */
-        }
-        slp_page_completion_common(env, cp, ioret);
-}
-
-static int slp_page_is_vmlocked(const struct lu_env *env,
-                                const struct cl_page_slice *slice)
-{
-        return -EBUSY;
-}
-
-static void slp_transient_page_fini(const struct lu_env *env,
-                                    struct cl_page_slice *slice)
-{
-        struct ccc_page *cp = cl2ccc_page(slice);
-        struct cl_page *clp = slice->cpl_page;
-        struct ccc_object *clobj = cl2ccc(clp->cp_obj);
-
-        slp_page_fini_common(cp);
-        clobj->cob_transient_pages--;
-}
-
-
-static const struct cl_page_operations slp_transient_page_ops = {
-        .cpo_own           = ccc_transient_page_own,
-        .cpo_assume        = ccc_transient_page_assume,
-        .cpo_unassume      = ccc_transient_page_unassume,
-        .cpo_disown        = ccc_transient_page_disown,
-        .cpo_discard       = ccc_transient_page_discard,
-        .cpo_is_vmlocked   = slp_page_is_vmlocked,
-        .cpo_fini          = slp_transient_page_fini,
-        .io = {
-                [CRT_READ] = {
-                        .cpo_completion  = slp_page_completion_read,
-                },
-                [CRT_WRITE] = {
-                        .cpo_completion  = slp_page_completion_write_common,
-                }
-        }
-};
-
-/*****************************************************************************
- *
- * Lock operations.
- *
- */
-
-static int slp_lock_enqueue(const struct lu_env *env,
-                           const struct cl_lock_slice *slice,
-                           struct cl_io *unused, __u32 enqflags)
-{
-        CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
-
-        liblustre_wait_event(0);
-        return 0;
-}
-
-static const struct cl_lock_operations slp_lock_ops = {
-        .clo_delete    = ccc_lock_delete,
-        .clo_fini      = ccc_lock_fini,
-        .clo_enqueue   = slp_lock_enqueue,
-        .clo_wait      = ccc_lock_wait,
-        .clo_unuse     = ccc_lock_unuse,
-        .clo_fits_into = ccc_lock_fits_into,
-};
-
-/*****************************************************************************
- *
- * io operations.
- *
- */
-
-static int slp_io_rw_lock(const struct lu_env *env,
-                          const struct cl_io_slice *ios)
-{
-        struct ccc_io *cio = ccc_env_io(env);
-        struct cl_io *io   = ios->cis_io;
-        loff_t start;
-        loff_t end;
-
-        if (cl_io_is_append(io)) {
-                start = 0;
-                end   = OBD_OBJECT_EOF;
-        } else {
-                start = io->u.ci_wr.wr.crw_pos;
-                end   = start + io->u.ci_wr.wr.crw_count - 1;
-        }
-
-        ccc_io_update_iov(env, cio, io);
-
-        /*
-         * This acquires real DLM lock only in O_APPEND case, because of
-         * the io->ci_lockreq setting in llu_io_init().
-         */
-        LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
-        LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
-        return ccc_io_one_lock(env, io, 0,
-                               io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
-                               start, end);
-
-}
-
-static int slp_io_setattr_iter_init(const struct lu_env *env,
-                                    const struct cl_io_slice *ios)
-{
-        return 0;
-}
-
-static int slp_io_setattr_start(const struct lu_env *env,
-                                const struct cl_io_slice *ios)
-{
-        return 0;
-}
-
-static struct page *llu_get_user_page(int index, void *addr, int offset,
-                                      int count)
-{
-        struct page *page;
-
-        OBD_ALLOC_PTR(page);
-        if (!page)
-                return NULL;
-        page->index = index;
-        page->addr = addr;
-        page->_offset = offset;
-        page->_count = count;
-
-        CFS_INIT_LIST_HEAD(&page->list);
-        CFS_INIT_LIST_HEAD(&page->_node);
-
-        return page;
-}
-
-static void llu_free_user_page(struct page *page)
-{
-        OBD_FREE_PTR(page);
-}
-
-
-static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
-                         struct llu_io_group *group,
-                         char *buf, size_t count, loff_t pos)
-{
-        struct cl_object *obj = io->ci_obj;
-        struct inode *inode = ccc_object_inode(obj);
-        struct intnl_stat *st = llu_i2stat(inode);
-        struct obd_export *exp = llu_i2obdexp(inode);
-        struct page *page;
-        int  rc = 0, ret_bytes = 0;
-        struct cl_page *clp;
-        struct cl_2queue *queue;
-        ENTRY;
-
-        if (!exp)
-                RETURN(-EINVAL);
-
-        queue = &io->ci_queue;
-        cl_2queue_init(queue);
-
-
-        /* prepare the pages array */
-        do {
-                unsigned long index, offset, bytes;
-
-                offset = (pos & ~CFS_PAGE_MASK);
-               index = pos >> PAGE_CACHE_SHIFT;
-               bytes = PAGE_CACHE_SIZE - offset;
-                if (bytes > count)
-                        bytes = count;
-
-                /* prevent read beyond file range */
-                if (/* local_lock && */
-                    io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
-                        if (pos >= st->st_size)
-                                break;
-                        bytes = st->st_size - pos;
-                }
-
-                /* prepare page for this index */
-                page = llu_get_user_page(index, buf - offset, offset, bytes);
-                if (!page) {
-                        rc = -ENOMEM;
-                        break;
-                }
-
-                clp = cl_page_find(env, obj,
-                                   cl_index(obj, pos),
-                                   page, CPT_TRANSIENT);
-
-                if (IS_ERR(clp)) {
-                        rc = PTR_ERR(clp);
-                        break;
-                }
-
-                rc = cl_page_own(env, io, clp);
-                if (rc) {
-                        LASSERT(clp->cp_state == CPS_FREEING);
-                        cl_page_put(env, clp);
-                        break;
-                }
-
-                cl_2queue_add(queue, clp);
-
-                /* drop the reference count for cl_page_find, so that the page
-                 * will be freed in cl_2queue_fini. */
-                cl_page_put(env, clp);
-
-                cl_page_clip(env, clp, offset, offset+bytes);
-
-                count -= bytes;
-                pos += bytes;
-                buf += bytes;
-
-                group->lig_rwcount += bytes;
-                ret_bytes += bytes;
-                page++;
-        } while (count);
-
-        if (rc == 0) {
-                enum cl_req_type iot;
-                iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
-               rc = cl_io_submit_sync(env, io, iot, queue, 0);
-        }
-
-        group->lig_rc = rc;
-
-        cl_2queue_discard(env, io, queue);
-        cl_2queue_disown(env, io, queue);
-        cl_2queue_fini(env, queue);
-
-        RETURN(ret_bytes);
-}
-
-static
-struct llu_io_group *get_io_group(struct inode *inode, int maxpages)
-{
-        struct llu_io_group *group;
-
-        OBD_ALLOC_PTR(group);
-        if (!group)
-                return ERR_PTR(-ENOMEM);
-
-        return group;
-}
-
-static int max_io_pages(ssize_t len, int iovlen)
-{
-       return ((len + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE) +
-               2 + iovlen - 1;
-}
-
-void put_io_group(struct llu_io_group *group)
-{
-        OBD_FREE_PTR(group);
-}
-
-/**
- * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
- */
-int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
-{
-        return 1;
-}
-
-static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
-{
-        struct ccc_io     *cio   = cl2ccc_io(env, ios);
-        struct cl_io      *io    = ios->cis_io;
-        struct cl_object  *obj   = io->ci_obj;
-        struct inode      *inode = ccc_object_inode(obj);
-        int    err, ret;
-        loff_t pos;
-        long   cnt;
-        struct llu_io_group *iogroup;
-        int iovidx;
-        struct intnl_stat *st = llu_i2stat(inode);
-        struct llu_inode_info *lli = llu_i2info(inode);
-        struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
-        int write = io->ci_type == CIT_WRITE;
-        int exceed = 0;
-
-        CLOBINVRNT(env, obj, ccc_object_invariant(obj));
-
-        if (write) {
-                pos = io->u.ci_wr.wr.crw_pos;
-                cnt = io->u.ci_wr.wr.crw_count;
-        } else {
-                pos = io->u.ci_rd.rd.crw_pos;
-                cnt = io->u.ci_rd.rd.crw_count;
-        }
-
-       iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs));
-        if (IS_ERR(iogroup))
-                RETURN(PTR_ERR(iogroup));
-
-        err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
-        if (err != 0 || (write == 0 && exceed != 0))
-                GOTO(out, err);
-
-        CDEBUG(D_INODE,
-               "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
-               write ? "Write" : "Read", (unsigned long)st->st_ino,
-               cnt, (__u64)pos, (__u64)st->st_size);
-
-        if (write && io->u.ci_wr.wr_append)
-                pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
-                /* XXX What about if one write syscall writes at 2 different offsets? */
-
-        for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
-                char *buf = (char *) cio->cui_iov[iovidx].iov_base;
-                long count = cio->cui_iov[iovidx].iov_len;
-
-                if (!count)
-                        continue;
-                if (cnt < count)
-                        count = cnt;
-                if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
-                        GOTO(out, err = -EFAULT);
-                }
-
-                if (io->ci_type == CIT_READ) {
-                        if (/* local_lock && */ pos >= st->st_size)
-                                break;
-                } else if (io->ci_type == CIT_WRITE) {
-                        if (pos >= lli->lli_maxbytes) {
-                                GOTO(out, err = -EFBIG);
-                        }
-                        if (pos + count >= lli->lli_maxbytes)
-                                count = lli->lli_maxbytes - pos;
-                } else {
-                        LBUG();
-                }
-
-                ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
-                if (ret < 0) {
-                        GOTO(out, err = ret);
-                } else {
-                        io->ci_nob += ret;
-                        pos += ret;
-                        cnt -= ret;
-                        if (io->ci_type == CIT_WRITE) {
-//                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
-                                if (pos > st->st_size)
-                                        st->st_size = pos;
-                        }
-                        if (!cnt)
-                                break;
-                }
-        }
-        LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
-
-        if (!iogroup->lig_rc)
-                session->lis_rwcount += iogroup->lig_rwcount;
-        else if (!session->lis_rc)
-                session->lis_rc = iogroup->lig_rc;
-        err = 0;
-
-out:
-        put_io_group(iogroup);
-        return err;
-}
-
-static const struct cl_io_operations ccc_io_ops = {
-        .op = {
-                [CIT_READ] = {
-                        .cio_fini      = ccc_io_fini,
-                        .cio_lock      = slp_io_rw_lock,
-                        .cio_start     = slp_io_start,
-                        .cio_end       = ccc_io_end,
-                        .cio_advance   = ccc_io_advance
-                },
-                [CIT_WRITE] = {
-                        .cio_fini      = ccc_io_fini,
-                        .cio_lock      = slp_io_rw_lock,
-                        .cio_start     = slp_io_start,
-                        .cio_end       = ccc_io_end,
-                        .cio_advance   = ccc_io_advance
-                },
-                [CIT_SETATTR] = {
-                        .cio_fini       = ccc_io_fini,
-                        .cio_iter_init  = slp_io_setattr_iter_init,
-                        .cio_start      = slp_io_setattr_start
-                },
-                [CIT_MISC] = {
-                        .cio_fini   = ccc_io_fini
-                }
-        }
-};
-
-static struct slp_io *cl2slp_io(const struct lu_env *env,
-                                const struct cl_io_slice *slice)
-{
-        /* We call it just for assertion here */
-        cl2ccc_io(env, slice);
-
-        return slp_env_io(env);
-}
-
-/*****************************************************************************
- *
- * Temporary prototype thing: mirror obd-devices into cl devices.
- *
- */
-
-int cl_sb_init(struct llu_sb_info *sbi)
-{
-        struct cl_device  *cl;
-        struct lu_env     *env;
-        int rc = 0;
-        int refcheck;
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        cl = cl_type_setup(env, NULL, &slp_device_type,
-                           sbi->ll_dt_exp->exp_obd->obd_lu_dev);
-        if (IS_ERR(cl))
-                GOTO(out, rc = PTR_ERR(cl));
-
-        sbi->ll_cl = cl;
-        sbi->ll_site = cl2lu_dev(cl)->ld_site;
-out:
-        cl_env_put(env, &refcheck);
-        RETURN(rc);
-}
-
-int cl_sb_fini(struct llu_sb_info *sbi)
-{
-        struct lu_env *env;
-        int refcheck;
-
-        ENTRY;
-
-        env = cl_env_get(&refcheck);
-        if (IS_ERR(env))
-                RETURN(PTR_ERR(env));
-
-        if (sbi->ll_cl != NULL) {
-                cl_stack_fini(env, sbi->ll_cl);
-                sbi->ll_cl = NULL;
-                sbi->ll_site = NULL;
-        }
-        cl_env_put(env, &refcheck);
-       cl_env_cache_purge(~0);
-
-       RETURN(0);
-}