Whamcloud - gitweb
LU-3382 build: clean unused link dependencies
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
1 /*
2  *   Copyright (c) 2007 Cluster File Systems, Inc.
3  *   Author: Nikita Danilov <nikita@clusterfs.com>
4  *
5  *   This file is part of Lustre, http://www.lustre.org.
6  *
7  *   Lustre is free software; you can redistribute it and/or
8  *   modify it under the terms of version 2 of the GNU General Public
9  *   License as published by the Free Software Foundation.
10  *
11  *   Lustre is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with Lustre; if not, write to the Free Software
18  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  *   Copyright (c) 2011, 2013, Intel Corporation.
21  */
22
23 #define DEBUG_SUBSYSTEM S_LLITE
24
25 #include <errno.h>
26 #include <stdarg.h>
27 #include <stddef.h>
28 #include <sys/stat.h>
29 #include <sys/types.h>
30 #include <sys/uio.h>
31 #include <libcfs/libcfs.h>
32 #include <lustre/lustre_idl.h>
33 #include <liblustre.h>
34 #include <lclient.h>
35 #include <cl_object.h>
36 #include <lustre_export.h>
37 #include <lustre_lite.h>
38 #include <obd.h>
39 #include <obd_support.h>
40 #include "llite_lib.h"
41
42 /*
43  * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
44  * "llu_" prefix.
45  */
46
47 static int   slp_type_init     (struct lu_device_type *t);
48 static void  slp_type_fini     (struct lu_device_type *t);
49
50 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
51                          struct cl_page *page, pgoff_t index);
52 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
53                                struct cl_attr *attr);
54
55 static struct lu_device  *slp_device_alloc(const struct lu_env *env,
56                                            struct lu_device_type *t,
57                                            struct lustre_cfg *cfg);
58
59 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
60                        struct cl_io *io);
61 static struct slp_io *cl2slp_io(const struct lu_env *env,
62                                 const struct cl_io_slice *slice);
63
64
65 static void llu_free_user_page(struct page *page);
66
67 static const struct lu_object_operations      slp_lu_obj_ops;
68 static const struct lu_device_operations      slp_lu_ops;
69 static const struct cl_device_operations      slp_cl_ops;
70 static const struct cl_io_operations          ccc_io_ops;
71 static const struct lu_device_type_operations slp_device_type_ops;
72              //struct lu_device_type            slp_device_type;
73 static const struct cl_page_operations        slp_transient_page_ops;
74 static const struct cl_lock_operations        slp_lock_ops;
75
76
77 /*****************************************************************************
78  *
79  * Slp device and device type functions.
80  *
81  */
82
83 static void *slp_session_key_init(const struct lu_context *ctx,
84                                   struct lu_context_key *key)
85 {
86         struct slp_session *session;
87
88         OBD_ALLOC_PTR(session);
89         if (session == NULL)
90                 session = ERR_PTR(-ENOMEM);
91         return session;
92 }
93
94 static void slp_session_key_fini(const struct lu_context *ctx,
95                                  struct lu_context_key *key, void *data)
96 {
97         struct slp_session *session = data;
98         OBD_FREE_PTR(session);
99 }
100
101 struct lu_context_key slp_session_key = {
102         .lct_tags = LCT_SESSION,
103         .lct_init = slp_session_key_init,
104         .lct_fini = slp_session_key_fini
105 };
106
107 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
108 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
109
110 static struct lu_device *slp_device_alloc(const struct lu_env *env,
111                                           struct lu_device_type *t,
112                                           struct lustre_cfg *cfg)
113 {
114         return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
115 }
116
117 static int slp_lock_init(const struct lu_env *env,
118                          struct cl_object *obj, struct cl_lock *lock,
119                          const struct cl_io *io)
120 {
121         return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
122 }
123
124 static const struct cl_object_operations slp_ops = {
125         .coo_page_init = slp_page_init,
126         .coo_lock_init = slp_lock_init,
127         .coo_io_init   = slp_io_init,
128         .coo_attr_get  = slp_attr_get,
129         .coo_attr_set  = ccc_attr_set,
130         .coo_conf_set  = ccc_conf_set,
131         .coo_glimpse   = ccc_object_glimpse
132 };
133
134 static int slp_object_print(const struct lu_env *env, void *cookie,
135                             lu_printer_t p, const struct lu_object *o)
136 {
137         struct ccc_object *obj   = lu2ccc(o);
138         struct inode      *inode = obj->cob_inode;
139         struct intnl_stat *st = NULL;
140
141         if (inode)
142                 st = llu_i2stat(inode);
143
144         return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
145                     obj, inode,
146                     st ? (unsigned long)st->st_ino : 0UL,
147                     inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
148                     : 0);
149 }
150
151 static const struct lu_object_operations slp_lu_obj_ops = {
152         .loo_object_init      = ccc_object_init,
153         .loo_object_start     = NULL,
154         .loo_object_delete    = NULL,
155         .loo_object_release   = NULL,
156         .loo_object_free      = ccc_object_free,
157         .loo_object_print     = slp_object_print,
158         .loo_object_invariant = NULL
159 };
160
161 static struct lu_object *slp_object_alloc(const struct lu_env *env,
162                                           const struct lu_object_header *hdr,
163                                           struct lu_device *dev)
164 {
165         return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
166 }
167
168 static const struct lu_device_operations slp_lu_ops = {
169         .ldo_object_alloc      = slp_object_alloc
170 };
171
172 static const struct cl_device_operations slp_cl_ops = {
173         .cdo_req_init = ccc_req_init
174 };
175
176 static const struct lu_device_type_operations slp_device_type_ops = {
177         .ldto_init = slp_type_init,
178         .ldto_fini = slp_type_fini,
179
180         .ldto_start = slp_type_start,
181         .ldto_stop  = slp_type_stop,
182
183         .ldto_device_alloc = slp_device_alloc,
184         .ldto_device_free  = ccc_device_free,
185         .ldto_device_init  = ccc_device_init,
186         .ldto_device_fini  = ccc_device_fini
187 };
188
189 static struct lu_device_type slp_device_type = {
190         .ldt_tags     = LU_DEVICE_CL,
191         .ldt_name     = LUSTRE_SLP_NAME,
192         .ldt_ops      = &slp_device_type_ops,
193         .ldt_ctx_tags = LCT_CL_THREAD
194 };
195
196 int slp_global_init(void)
197 {
198         int result;
199
200         result = ccc_global_init(&slp_device_type);
201         return result;
202 }
203
204 void slp_global_fini(void)
205 {
206         ccc_global_fini(&slp_device_type);
207 }
208
209 /*****************************************************************************
210  *
211  * Object operations.
212  *
213  */
214
215 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
216                         struct cl_page *page, pgoff_t index)
217 {
218         struct ccc_page *cpg = cl_object_page_slice(obj, page);
219
220         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
221
222         cpg->cpg_page = page->cp_vmpage;
223
224         if (page->cp_type == CPT_CACHEABLE) {
225                 LBUG();
226         } else {
227                 struct ccc_object *clobj = cl2ccc(obj);
228
229                 cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
230                                   &slp_transient_page_ops);
231                 clobj->cob_transient_pages++;
232         }
233
234         return 0;
235 }
236
237 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
238                        struct cl_io *io)
239 {
240         struct ccc_io      *vio   = ccc_env_io(env);
241         int result = 0;
242
243         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
244
245         cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
246         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
247                 size_t count;
248
249                 count = io->u.ci_rw.crw_count;
250                 /* "If nbyte is 0, read() will return 0 and have no other
251                  *  results."  -- Single Unix Spec */
252                 if (count == 0)
253                         result = 1;
254                 else {
255                         vio->cui_tot_count = count;
256                         vio->cui_tot_nrsegs = 0;
257                 }
258
259         }
260         return result;
261 }
262
263 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
264                         struct cl_attr *attr)
265 {
266         struct inode *inode = ccc_object_inode(obj);
267         struct intnl_stat *st = llu_i2stat(inode);
268
269         attr->cat_size = st->st_size;
270         attr->cat_blocks = st->st_blocks;
271         attr->cat_mtime  = st->st_mtime;
272         attr->cat_atime  = st->st_atime;
273         attr->cat_ctime  = st->st_ctime;
274         /* KMS is not known by this layer */
275         return 0; /* layers below have to fill in the rest */
276 }
277
278 /*****************************************************************************
279  *
280  * Page operations.
281  *
282  */
283
284 static void slp_page_fini_common(struct ccc_page *cp)
285 {
286         struct page *vmpage = cp->cpg_page;
287
288         LASSERT(vmpage != NULL);
289         llu_free_user_page(vmpage);
290         OBD_FREE_PTR(cp);
291 }
292
293 static void slp_page_completion_common(const struct lu_env *env,
294                                        struct ccc_page *cp, int ioret)
295 {
296         LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
297 }
298
299 static void slp_page_completion_read(const struct lu_env *env,
300                                      const struct cl_page_slice *slice,
301                                      int ioret)
302 {
303         struct ccc_page *cp      = cl2ccc_page(slice);
304         ENTRY;
305
306         slp_page_completion_common(env, cp, ioret);
307
308         EXIT;
309 }
310
311 static void slp_page_completion_write_common(const struct lu_env *env,
312                                              const struct cl_page_slice *slice,
313                                              int ioret)
314 {
315         struct ccc_page *cp     = cl2ccc_page(slice);
316
317         if (ioret == 0) {
318                 cp->cpg_write_queued = 0;
319                 /*
320                  * Only ioret == 0, write succeed, then this page could be
321                  * deleted from the pending_writing count.
322                  */
323         }
324         slp_page_completion_common(env, cp, ioret);
325 }
326
327 static int slp_page_is_vmlocked(const struct lu_env *env,
328                                 const struct cl_page_slice *slice)
329 {
330         return -EBUSY;
331 }
332
333 static void slp_transient_page_fini(const struct lu_env *env,
334                                     struct cl_page_slice *slice)
335 {
336         struct ccc_page *cp = cl2ccc_page(slice);
337         struct cl_page *clp = slice->cpl_page;
338         struct ccc_object *clobj = cl2ccc(clp->cp_obj);
339
340         slp_page_fini_common(cp);
341         clobj->cob_transient_pages--;
342 }
343
344
345 static const struct cl_page_operations slp_transient_page_ops = {
346         .cpo_own           = ccc_transient_page_own,
347         .cpo_assume        = ccc_transient_page_assume,
348         .cpo_unassume      = ccc_transient_page_unassume,
349         .cpo_disown        = ccc_transient_page_disown,
350         .cpo_discard       = ccc_transient_page_discard,
351         .cpo_is_vmlocked   = slp_page_is_vmlocked,
352         .cpo_fini          = slp_transient_page_fini,
353         .io = {
354                 [CRT_READ] = {
355                         .cpo_completion  = slp_page_completion_read,
356                 },
357                 [CRT_WRITE] = {
358                         .cpo_completion  = slp_page_completion_write_common,
359                 }
360         }
361 };
362
363 /*****************************************************************************
364  *
365  * Lock operations.
366  *
367  */
368
369 static int slp_lock_enqueue(const struct lu_env *env,
370                            const struct cl_lock_slice *slice,
371                            struct cl_io *unused, __u32 enqflags)
372 {
373         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
374
375         liblustre_wait_event(0);
376         return 0;
377 }
378
379 static const struct cl_lock_operations slp_lock_ops = {
380         .clo_delete    = ccc_lock_delete,
381         .clo_fini      = ccc_lock_fini,
382         .clo_enqueue   = slp_lock_enqueue,
383         .clo_wait      = ccc_lock_wait,
384         .clo_unuse     = ccc_lock_unuse,
385         .clo_fits_into = ccc_lock_fits_into,
386 };
387
388 /*****************************************************************************
389  *
390  * io operations.
391  *
392  */
393
394 static int slp_io_rw_lock(const struct lu_env *env,
395                           const struct cl_io_slice *ios)
396 {
397         struct ccc_io *cio = ccc_env_io(env);
398         struct cl_io *io   = ios->cis_io;
399         loff_t start;
400         loff_t end;
401
402         if (cl_io_is_append(io)) {
403                 start = 0;
404                 end   = OBD_OBJECT_EOF;
405         } else {
406                 start = io->u.ci_wr.wr.crw_pos;
407                 end   = start + io->u.ci_wr.wr.crw_count - 1;
408         }
409
410         ccc_io_update_iov(env, cio, io);
411
412         /*
413          * This acquires real DLM lock only in O_APPEND case, because of
414          * the io->ci_lockreq setting in llu_io_init().
415          */
416         LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
417         LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
418         return ccc_io_one_lock(env, io, 0,
419                                io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
420                                start, end);
421
422 }
423
424 static int slp_io_setattr_iter_init(const struct lu_env *env,
425                                     const struct cl_io_slice *ios)
426 {
427         return 0;
428 }
429
430 static int slp_io_setattr_start(const struct lu_env *env,
431                                 const struct cl_io_slice *ios)
432 {
433         return 0;
434 }
435
436 static struct page *llu_get_user_page(int index, void *addr, int offset,
437                                       int count)
438 {
439         struct page *page;
440
441         OBD_ALLOC_PTR(page);
442         if (!page)
443                 return NULL;
444         page->index = index;
445         page->addr = addr;
446         page->_offset = offset;
447         page->_count = count;
448
449         CFS_INIT_LIST_HEAD(&page->list);
450         CFS_INIT_LIST_HEAD(&page->_node);
451
452         return page;
453 }
454
455 static void llu_free_user_page(struct page *page)
456 {
457         OBD_FREE_PTR(page);
458 }
459
460
461 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
462                          struct llu_io_group *group,
463                          char *buf, size_t count, loff_t pos)
464 {
465         struct cl_object *obj = io->ci_obj;
466         struct inode *inode = ccc_object_inode(obj);
467         struct intnl_stat *st = llu_i2stat(inode);
468         struct obd_export *exp = llu_i2obdexp(inode);
469         struct page *page;
470         int  rc = 0, ret_bytes = 0;
471         struct cl_page *clp;
472         struct cl_2queue *queue;
473         ENTRY;
474
475         if (!exp)
476                 RETURN(-EINVAL);
477
478         queue = &io->ci_queue;
479         cl_2queue_init(queue);
480
481
482         /* prepare the pages array */
483         do {
484                 unsigned long index, offset, bytes;
485
486                 offset = (pos & ~CFS_PAGE_MASK);
487                 index = pos >> PAGE_CACHE_SHIFT;
488                 bytes = PAGE_CACHE_SIZE - offset;
489                 if (bytes > count)
490                         bytes = count;
491
492                 /* prevent read beyond file range */
493                 if (/* local_lock && */
494                     io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
495                         if (pos >= st->st_size)
496                                 break;
497                         bytes = st->st_size - pos;
498                 }
499
500                 /* prepare page for this index */
501                 page = llu_get_user_page(index, buf - offset, offset, bytes);
502                 if (!page) {
503                         rc = -ENOMEM;
504                         break;
505                 }
506
507                 clp = cl_page_find(env, obj,
508                                    cl_index(obj, pos),
509                                    page, CPT_TRANSIENT);
510
511                 if (IS_ERR(clp)) {
512                         rc = PTR_ERR(clp);
513                         break;
514                 }
515
516                 rc = cl_page_own(env, io, clp);
517                 if (rc) {
518                         LASSERT(clp->cp_state == CPS_FREEING);
519                         cl_page_put(env, clp);
520                         break;
521                 }
522
523                 cl_2queue_add(queue, clp);
524
525                 /* drop the reference count for cl_page_find, so that the page
526                  * will be freed in cl_2queue_fini. */
527                 cl_page_put(env, clp);
528
529                 cl_page_clip(env, clp, offset, offset+bytes);
530
531                 count -= bytes;
532                 pos += bytes;
533                 buf += bytes;
534
535                 group->lig_rwcount += bytes;
536                 ret_bytes += bytes;
537                 page++;
538         } while (count);
539
540         if (rc == 0) {
541                 enum cl_req_type iot;
542                 iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
543                 rc = cl_io_submit_sync(env, io, iot, queue, 0);
544         }
545
546         group->lig_rc = rc;
547
548         cl_2queue_discard(env, io, queue);
549         cl_2queue_disown(env, io, queue);
550         cl_2queue_fini(env, queue);
551
552         RETURN(ret_bytes);
553 }
554
555 static
556 struct llu_io_group *get_io_group(struct inode *inode, int maxpages)
557 {
558         struct llu_io_group *group;
559
560         OBD_ALLOC_PTR(group);
561         if (!group)
562                 return ERR_PTR(-ENOMEM);
563
564         return group;
565 }
566
567 static int max_io_pages(ssize_t len, int iovlen)
568 {
569         return ((len + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE) +
570                 2 + iovlen - 1;
571 }
572
573 void put_io_group(struct llu_io_group *group)
574 {
575         OBD_FREE_PTR(group);
576 }
577
578 /**
579  * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
580  */
581 int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
582 {
583         return 1;
584 }
585
586 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
587 {
588         struct ccc_io     *cio   = cl2ccc_io(env, ios);
589         struct cl_io      *io    = ios->cis_io;
590         struct cl_object  *obj   = io->ci_obj;
591         struct inode      *inode = ccc_object_inode(obj);
592         int    err, ret;
593         loff_t pos;
594         long   cnt;
595         struct llu_io_group *iogroup;
596         int iovidx;
597         struct intnl_stat *st = llu_i2stat(inode);
598         struct llu_inode_info *lli = llu_i2info(inode);
599         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
600         int write = io->ci_type == CIT_WRITE;
601         int exceed = 0;
602
603         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
604
605         if (write) {
606                 pos = io->u.ci_wr.wr.crw_pos;
607                 cnt = io->u.ci_wr.wr.crw_count;
608         } else {
609                 pos = io->u.ci_rd.rd.crw_pos;
610                 cnt = io->u.ci_rd.rd.crw_count;
611         }
612
613         iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs));
614         if (IS_ERR(iogroup))
615                 RETURN(PTR_ERR(iogroup));
616
617         err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
618         if (err != 0 || (write == 0 && exceed != 0))
619                 GOTO(out, err);
620
621         CDEBUG(D_INODE,
622                "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
623                write ? "Write" : "Read", (unsigned long)st->st_ino,
624                cnt, (__u64)pos, (__u64)st->st_size);
625
626         if (write && io->u.ci_wr.wr_append)
627                 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
628                 /* XXX What about if one write syscall writes at 2 different offsets? */
629
630         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
631                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
632                 long count = cio->cui_iov[iovidx].iov_len;
633
634                 if (!count)
635                         continue;
636                 if (cnt < count)
637                         count = cnt;
638                 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
639                         GOTO(out, err = -EFAULT);
640                 }
641
642                 if (io->ci_type == CIT_READ) {
643                         if (/* local_lock && */ pos >= st->st_size)
644                                 break;
645                 } else if (io->ci_type == CIT_WRITE) {
646                         if (pos >= lli->lli_maxbytes) {
647                                 GOTO(out, err = -EFBIG);
648                         }
649                         if (pos + count >= lli->lli_maxbytes)
650                                 count = lli->lli_maxbytes - pos;
651                 } else {
652                         LBUG();
653                 }
654
655                 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
656                 if (ret < 0) {
657                         GOTO(out, err = ret);
658                 } else {
659                         io->ci_nob += ret;
660                         pos += ret;
661                         cnt -= ret;
662                         if (io->ci_type == CIT_WRITE) {
663 //                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
664                                 if (pos > st->st_size)
665                                         st->st_size = pos;
666                         }
667                         if (!cnt)
668                                 break;
669                 }
670         }
671         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
672
673         if (!iogroup->lig_rc)
674                 session->lis_rwcount += iogroup->lig_rwcount;
675         else if (!session->lis_rc)
676                 session->lis_rc = iogroup->lig_rc;
677         err = 0;
678
679 out:
680         put_io_group(iogroup);
681         return err;
682 }
683
684 static const struct cl_io_operations ccc_io_ops = {
685         .op = {
686                 [CIT_READ] = {
687                         .cio_fini      = ccc_io_fini,
688                         .cio_lock      = slp_io_rw_lock,
689                         .cio_start     = slp_io_start,
690                         .cio_end       = ccc_io_end,
691                         .cio_advance   = ccc_io_advance
692                 },
693                 [CIT_WRITE] = {
694                         .cio_fini      = ccc_io_fini,
695                         .cio_lock      = slp_io_rw_lock,
696                         .cio_start     = slp_io_start,
697                         .cio_end       = ccc_io_end,
698                         .cio_advance   = ccc_io_advance
699                 },
700                 [CIT_SETATTR] = {
701                         .cio_fini       = ccc_io_fini,
702                         .cio_iter_init  = slp_io_setattr_iter_init,
703                         .cio_start      = slp_io_setattr_start
704                 },
705                 [CIT_MISC] = {
706                         .cio_fini   = ccc_io_fini
707                 }
708         }
709 };
710
711 static struct slp_io *cl2slp_io(const struct lu_env *env,
712                                 const struct cl_io_slice *slice)
713 {
714         /* We call it just for assertion here */
715         cl2ccc_io(env, slice);
716
717         return slp_env_io(env);
718 }
719
720 /*****************************************************************************
721  *
722  * Temporary prototype thing: mirror obd-devices into cl devices.
723  *
724  */
725
726 int cl_sb_init(struct llu_sb_info *sbi)
727 {
728         struct cl_device  *cl;
729         struct lu_env     *env;
730         int rc = 0;
731         int refcheck;
732
733         env = cl_env_get(&refcheck);
734         if (IS_ERR(env))
735                 RETURN(PTR_ERR(env));
736
737         cl = cl_type_setup(env, NULL, &slp_device_type,
738                            sbi->ll_dt_exp->exp_obd->obd_lu_dev);
739         if (IS_ERR(cl))
740                 GOTO(out, rc = PTR_ERR(cl));
741
742         sbi->ll_cl = cl;
743         sbi->ll_site = cl2lu_dev(cl)->ld_site;
744 out:
745         cl_env_put(env, &refcheck);
746         RETURN(rc);
747 }
748
749 int cl_sb_fini(struct llu_sb_info *sbi)
750 {
751         struct lu_env *env;
752         int refcheck;
753
754         ENTRY;
755
756         env = cl_env_get(&refcheck);
757         if (IS_ERR(env))
758                 RETURN(PTR_ERR(env));
759
760         if (sbi->ll_cl != NULL) {
761                 cl_stack_fini(env, sbi->ll_cl);
762                 sbi->ll_cl = NULL;
763                 sbi->ll_site = NULL;
764         }
765         cl_env_put(env, &refcheck);
766         cl_env_cache_purge(~0);
767
768         RETURN(0);
769 }