Whamcloud - gitweb
LU-1030 osc: new IO engine implementation
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
1 /*
2  *   Copyright (c) 2007 Cluster File Systems, Inc.
3  *   Author: Nikita Danilov <nikita@clusterfs.com>
4  *
5  *   This file is part of Lustre, http://www.lustre.org.
6  *
7  *   Lustre is free software; you can redistribute it and/or
8  *   modify it under the terms of version 2 of the GNU General Public
9  *   License as published by the Free Software Foundation.
10  *
11  *   Lustre is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with Lustre; if not, write to the Free Software
18  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  */
20
21 #define DEBUG_SUBSYSTEM S_LLITE
22
23 #include <stdlib.h>
24 #include <string.h>
25 #include <assert.h>
26 #include <time.h>
27 #include <sys/types.h>
28 #include <sys/stat.h>
29 #include <fcntl.h>
30 #include <sys/queue.h>
31 #ifndef __CYGWIN__
32 # include <sys/statvfs.h>
33 #else
34 # include <sys/statfs.h>
35 #endif
36
37 #include <liblustre.h>
38
39 #include <obd.h>
40 #include <obd_support.h>
41 #include <lustre_fid.h>
42 #include <lustre_lite.h>
43 #include <lustre_dlm.h>
44 #include <lustre_ver.h>
45 #include <lustre_mdc.h>
46 #include <cl_object.h>
47
48 #include "llite_lib.h"
49
50 /*
51  * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
52  * "llu_" prefix.
53  */
54
55 static int   slp_type_init     (struct lu_device_type *t);
56 static void  slp_type_fini     (struct lu_device_type *t);
57
58 static struct cl_page * slp_page_init(const struct lu_env *env,
59                                      struct cl_object *obj,
60                                      struct cl_page *page, cfs_page_t *vmpage);
61 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
62                                struct cl_attr *attr);
63
64 static struct lu_device  *slp_device_alloc(const struct lu_env *env,
65                                            struct lu_device_type *t,
66                                            struct lustre_cfg *cfg);
67
68 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
69                        struct cl_io *io);
70 static struct slp_io *cl2slp_io(const struct lu_env *env,
71                                 const struct cl_io_slice *slice);
72
73
74 static void llu_free_user_page(struct page *page);
75
76 static const struct lu_object_operations      slp_lu_obj_ops;
77 static const struct lu_device_operations      slp_lu_ops;
78 static const struct cl_device_operations      slp_cl_ops;
79 static const struct cl_io_operations          ccc_io_ops;
80 static const struct lu_device_type_operations slp_device_type_ops;
81              //struct lu_device_type            slp_device_type;
82 static const struct cl_page_operations        slp_page_ops;
83 static const struct cl_page_operations        slp_transient_page_ops;
84 static const struct cl_lock_operations        slp_lock_ops;
85
86
87 /*****************************************************************************
88  *
89  * Slp device and device type functions.
90  *
91  */
92
93 void *slp_session_key_init(const struct lu_context *ctx,
94                                   struct lu_context_key *key)
95 {
96         struct slp_session *session;
97
98         OBD_ALLOC_PTR(session);
99         if (session == NULL)
100                 session = ERR_PTR(-ENOMEM);
101         return session;
102 }
103
104 void slp_session_key_fini(const struct lu_context *ctx,
105                                  struct lu_context_key *key, void *data)
106 {
107         struct slp_session *session = data;
108         OBD_FREE_PTR(session);
109 }
110
111 struct lu_context_key slp_session_key = {
112         .lct_tags = LCT_SESSION,
113         .lct_init = slp_session_key_init,
114         .lct_fini = slp_session_key_fini
115 };
116
117 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
118 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
119
120 static struct lu_device *slp_device_alloc(const struct lu_env *env,
121                                           struct lu_device_type *t,
122                                           struct lustre_cfg *cfg)
123 {
124         return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
125 }
126
127 static int slp_lock_init(const struct lu_env *env,
128                          struct cl_object *obj, struct cl_lock *lock,
129                          const struct cl_io *io)
130 {
131         return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
132 }
133
134 static const struct cl_object_operations slp_ops = {
135         .coo_page_init = slp_page_init,
136         .coo_lock_init = slp_lock_init,
137         .coo_io_init   = slp_io_init,
138         .coo_attr_get  = slp_attr_get,
139         .coo_attr_set  = ccc_attr_set,
140         .coo_conf_set  = ccc_conf_set,
141         .coo_glimpse   = ccc_object_glimpse
142 };
143
144 static int slp_object_print(const struct lu_env *env, void *cookie,
145                             lu_printer_t p, const struct lu_object *o)
146 {
147         struct ccc_object *obj   = lu2ccc(o);
148         struct inode      *inode = obj->cob_inode;
149         struct intnl_stat *st = NULL;
150
151         if (inode)
152                 st = llu_i2stat(inode);
153
154         return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
155                     obj, inode,
156                     st ? (unsigned long)st->st_ino : 0UL,
157                     inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
158                     : 0);
159 }
160
161 static const struct lu_object_operations slp_lu_obj_ops = {
162         .loo_object_init      = ccc_object_init,
163         .loo_object_start     = NULL,
164         .loo_object_delete    = NULL,
165         .loo_object_release   = NULL,
166         .loo_object_free      = ccc_object_free,
167         .loo_object_print     = slp_object_print,
168         .loo_object_invariant = NULL
169 };
170
171 static struct lu_object *slp_object_alloc(const struct lu_env *env,
172                                           const struct lu_object_header *hdr,
173                                           struct lu_device *dev)
174 {
175         return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
176 }
177
178 static const struct lu_device_operations slp_lu_ops = {
179         .ldo_object_alloc      = slp_object_alloc
180 };
181
182 static const struct cl_device_operations slp_cl_ops = {
183         .cdo_req_init = ccc_req_init
184 };
185
186 static const struct lu_device_type_operations slp_device_type_ops = {
187         .ldto_init = slp_type_init,
188         .ldto_fini = slp_type_fini,
189
190         .ldto_start = slp_type_start,
191         .ldto_stop  = slp_type_stop,
192
193         .ldto_device_alloc = slp_device_alloc,
194         .ldto_device_free  = ccc_device_free,
195         .ldto_device_init  = ccc_device_init,
196         .ldto_device_fini  = ccc_device_fini
197 };
198
199 struct lu_device_type slp_device_type = {
200         .ldt_tags     = LU_DEVICE_CL,
201         .ldt_name     = LUSTRE_SLP_NAME,
202         .ldt_ops      = &slp_device_type_ops,
203         .ldt_ctx_tags = LCT_CL_THREAD
204 };
205
206 int slp_global_init(void)
207 {
208         int result;
209
210         result = ccc_global_init(&slp_device_type);
211         return result;
212 }
213
214 void slp_global_fini(void)
215 {
216         ccc_global_fini(&slp_device_type);
217 }
218
219 /*****************************************************************************
220  *
221  * Object operations.
222  *
223  */
224
225 static struct cl_page *slp_page_init(const struct lu_env *env,
226                                      struct cl_object *obj,
227                                      struct cl_page *page, cfs_page_t *vmpage)
228 {
229         struct ccc_page *cpg;
230         int result;
231
232         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
233
234         OBD_ALLOC_PTR(cpg);
235         if (cpg != NULL) {
236                 cpg->cpg_page = vmpage;
237
238                 if (page->cp_type == CPT_CACHEABLE) {
239                         LBUG();
240                 } else {
241                         struct ccc_object *clobj = cl2ccc(obj);
242
243                         cl_page_slice_add(page, &cpg->cpg_cl, obj,
244                                           &slp_transient_page_ops);
245                         clobj->cob_transient_pages++;
246                 }
247                 result = 0;
248         } else
249                 result = -ENOMEM;
250         return ERR_PTR(result);
251 }
252
253 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
254                        struct cl_io *io)
255 {
256         struct ccc_io      *vio   = ccc_env_io(env);
257         int result = 0;
258
259         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
260
261         cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
262         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
263                 size_t count;
264
265                 count = io->u.ci_rw.crw_count;
266                 /* "If nbyte is 0, read() will return 0 and have no other
267                  *  results."  -- Single Unix Spec */
268                 if (count == 0)
269                         result = 1;
270                 else {
271                         vio->cui_tot_count = count;
272                         vio->cui_tot_nrsegs = 0;
273                 }
274
275         }
276         return result;
277 }
278
279 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
280                         struct cl_attr *attr)
281 {
282         struct inode *inode = ccc_object_inode(obj);
283         struct intnl_stat *st = llu_i2stat(inode);
284
285         attr->cat_size = st->st_size;
286         attr->cat_blocks = st->st_blocks;
287         attr->cat_mtime  = st->st_mtime;
288         attr->cat_atime  = st->st_atime;
289         attr->cat_ctime  = st->st_ctime;
290         /* KMS is not known by this layer */
291         return 0; /* layers below have to fill in the rest */
292 }
293
294 /*****************************************************************************
295  *
296  * Page operations.
297  *
298  */
299
300 static void slp_page_fini_common(struct ccc_page *cp)
301 {
302         cfs_page_t *vmpage = cp->cpg_page;
303
304         LASSERT(vmpage != NULL);
305         llu_free_user_page(vmpage);
306         OBD_FREE_PTR(cp);
307 }
308
309 static void slp_page_completion_common(const struct lu_env *env,
310                                        struct ccc_page *cp, int ioret)
311 {
312         LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
313 }
314
315 static void slp_page_completion_read(const struct lu_env *env,
316                                      const struct cl_page_slice *slice,
317                                      int ioret)
318 {
319         struct ccc_page *cp      = cl2ccc_page(slice);
320         ENTRY;
321
322         slp_page_completion_common(env, cp, ioret);
323
324         EXIT;
325 }
326
327 static void slp_page_completion_write_common(const struct lu_env *env,
328                                              const struct cl_page_slice *slice,
329                                              int ioret)
330 {
331         struct ccc_page *cp     = cl2ccc_page(slice);
332
333         if (ioret == 0) {
334                 cp->cpg_write_queued = 0;
335                 /*
336                  * Only ioret == 0, write succeed, then this page could be
337                  * deleted from the pending_writing count.
338                  */
339         }
340         slp_page_completion_common(env, cp, ioret);
341 }
342
343 static int slp_page_is_vmlocked(const struct lu_env *env,
344                                 const struct cl_page_slice *slice)
345 {
346         return -EBUSY;
347 }
348
349 static void slp_transient_page_fini(const struct lu_env *env,
350                                     struct cl_page_slice *slice)
351 {
352         struct ccc_page *cp = cl2ccc_page(slice);
353         struct cl_page *clp = slice->cpl_page;
354         struct ccc_object *clobj = cl2ccc(clp->cp_obj);
355
356         slp_page_fini_common(cp);
357         clobj->cob_transient_pages--;
358 }
359
360
361 static const struct cl_page_operations slp_transient_page_ops = {
362         .cpo_own           = ccc_transient_page_own,
363         .cpo_assume        = ccc_transient_page_assume,
364         .cpo_unassume      = ccc_transient_page_unassume,
365         .cpo_disown        = ccc_transient_page_disown,
366         .cpo_discard       = ccc_transient_page_discard,
367         .cpo_vmpage        = ccc_page_vmpage,
368         .cpo_is_vmlocked   = slp_page_is_vmlocked,
369         .cpo_fini          = slp_transient_page_fini,
370         .cpo_is_under_lock = ccc_page_is_under_lock,
371         .io = {
372                 [CRT_READ] = {
373                         .cpo_completion  = slp_page_completion_read,
374                 },
375                 [CRT_WRITE] = {
376                         .cpo_completion  = slp_page_completion_write_common,
377                 }
378         }
379 };
380
381 /*****************************************************************************
382  *
383  * Lock operations.
384  *
385  */
386
387 static int slp_lock_enqueue(const struct lu_env *env,
388                            const struct cl_lock_slice *slice,
389                            struct cl_io *unused, __u32 enqflags)
390 {
391         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
392
393         liblustre_wait_event(0);
394         return 0;
395 }
396
397 static const struct cl_lock_operations slp_lock_ops = {
398         .clo_delete    = ccc_lock_delete,
399         .clo_fini      = ccc_lock_fini,
400         .clo_enqueue   = slp_lock_enqueue,
401         .clo_wait      = ccc_lock_wait,
402         .clo_unuse     = ccc_lock_unuse,
403         .clo_fits_into = ccc_lock_fits_into,
404 };
405
406 /*****************************************************************************
407  *
408  * io operations.
409  *
410  */
411
412 static int slp_io_rw_lock(const struct lu_env *env,
413                           const struct cl_io_slice *ios)
414 {
415         struct ccc_io *cio = ccc_env_io(env);
416         struct cl_io *io   = ios->cis_io;
417         loff_t start;
418         loff_t end;
419
420         if (cl_io_is_append(io)) {
421                 start = 0;
422                 end   = OBD_OBJECT_EOF;
423         } else {
424                 start = io->u.ci_wr.wr.crw_pos;
425                 end   = start + io->u.ci_wr.wr.crw_count - 1;
426         }
427
428         ccc_io_update_iov(env, cio, io);
429
430         /*
431          * This acquires real DLM lock only in O_APPEND case, because of
432          * the io->ci_lockreq setting in llu_io_init().
433          */
434         LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
435         LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
436         return ccc_io_one_lock(env, io, 0,
437                                io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
438                                start, end);
439
440 }
441
442 static int slp_io_setattr_iter_init(const struct lu_env *env,
443                                     const struct cl_io_slice *ios)
444 {
445         return 0;
446 }
447
448 static int slp_io_setattr_start(const struct lu_env *env,
449                                 const struct cl_io_slice *ios)
450 {
451         return 0;
452 }
453
454 static struct page *llu_get_user_page(int index, void *addr, int offset,
455                                       int count)
456 {
457         struct page *page;
458
459         OBD_ALLOC_PTR(page);
460         if (!page)
461                 return NULL;
462         page->index = index;
463         page->addr = addr;
464         page->_offset = offset;
465         page->_count = count;
466
467         CFS_INIT_LIST_HEAD(&page->list);
468         CFS_INIT_LIST_HEAD(&page->_node);
469
470         return page;
471 }
472
473 static void llu_free_user_page(struct page *page)
474 {
475         OBD_FREE_PTR(page);
476 }
477
478
479 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
480                          struct llu_io_group *group,
481                          char *buf, size_t count, loff_t pos)
482 {
483         struct cl_object *obj = io->ci_obj;
484         struct inode *inode = ccc_object_inode(obj);
485         struct intnl_stat *st = llu_i2stat(inode);
486         struct obd_export *exp = llu_i2obdexp(inode);
487         struct page *page;
488         int  rc = 0, ret_bytes = 0;
489         struct cl_page *clp;
490         struct cl_2queue *queue;
491         ENTRY;
492
493         if (!exp)
494                 RETURN(-EINVAL);
495
496         queue = &io->ci_queue;
497         cl_2queue_init(queue);
498
499
500         /* prepare the pages array */
501         do {
502                 unsigned long index, offset, bytes;
503
504                 offset = (pos & ~CFS_PAGE_MASK);
505                 index = pos >> CFS_PAGE_SHIFT;
506                 bytes = CFS_PAGE_SIZE - offset;
507                 if (bytes > count)
508                         bytes = count;
509
510                 /* prevent read beyond file range */
511                 if (/* local_lock && */
512                     io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
513                         if (pos >= st->st_size)
514                                 break;
515                         bytes = st->st_size - pos;
516                 }
517
518                 /* prepare page for this index */
519                 page = llu_get_user_page(index, buf - offset, offset, bytes);
520                 if (!page) {
521                         rc = -ENOMEM;
522                         break;
523                 }
524
525                 clp = cl_page_find(env, obj,
526                                    cl_index(obj, pos),
527                                    page, CPT_TRANSIENT);
528
529                 if (IS_ERR(clp)) {
530                         rc = PTR_ERR(clp);
531                         break;
532                 }
533
534                 rc = cl_page_own(env, io, clp);
535                 if (rc) {
536                         LASSERT(clp->cp_state == CPS_FREEING);
537                         cl_page_put(env, clp);
538                         break;
539                 }
540
541                 cl_2queue_add(queue, clp);
542
543                 /* drop the reference count for cl_page_find, so that the page
544                  * will be freed in cl_2queue_fini. */
545                 cl_page_put(env, clp);
546
547                 cl_page_clip(env, clp, offset, offset+bytes);
548
549                 count -= bytes;
550                 pos += bytes;
551                 buf += bytes;
552
553                 group->lig_rwcount += bytes;
554                 ret_bytes += bytes;
555                 page++;
556         } while (count);
557
558         if (rc == 0) {
559                 enum cl_req_type iot;
560                 iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
561                 rc = cl_io_submit_sync(env, io, iot, queue, 0);
562         }
563
564         group->lig_rc = rc;
565
566         cl_2queue_discard(env, io, queue);
567         cl_2queue_disown(env, io, queue);
568         cl_2queue_fini(env, queue);
569
570         RETURN(ret_bytes);
571 }
572
573 static
574 struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
575                                    struct lustre_rw_params *params)
576 {
577         struct llu_io_group *group;
578
579         OBD_ALLOC_PTR(group);
580         if (!group)
581                 return ERR_PTR(-ENOMEM);
582
583         group->lig_params = params;
584
585         return group;
586 }
587
588 static int max_io_pages(ssize_t len, int iovlen)
589 {
590         return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
591 }
592
593 void put_io_group(struct llu_io_group *group)
594 {
595         OBD_FREE_PTR(group);
596 }
597
598 /**
599  * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
600  */
601 int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
602 {
603         return 1;
604 }
605
606 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
607 {
608         struct ccc_io     *cio   = cl2ccc_io(env, ios);
609         struct cl_io      *io    = ios->cis_io;
610         struct cl_object  *obj   = io->ci_obj;
611         struct inode      *inode = ccc_object_inode(obj);
612         int    err, ret;
613         loff_t pos;
614         long   cnt;
615         struct llu_io_group *iogroup;
616         struct lustre_rw_params p = {0};
617         int iovidx;
618         struct intnl_stat *st = llu_i2stat(inode);
619         struct llu_inode_info *lli = llu_i2info(inode);
620         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
621         int write = io->ci_type == CIT_WRITE;
622         int exceed = 0;
623
624         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
625
626         if (write) {
627                 pos = io->u.ci_wr.wr.crw_pos;
628                 cnt = io->u.ci_wr.wr.crw_count;
629         } else {
630                 pos = io->u.ci_rd.rd.crw_pos;
631                 cnt = io->u.ci_rd.rd.crw_count;
632         }
633         if (io->u.ci_wr.wr_append) {
634                 p.lrp_lock_mode = LCK_PW;
635         } else {
636                 p.lrp_brw_flags = OBD_BRW_SRVLOCK;
637                 p.lrp_lock_mode = LCK_NL;
638         }
639
640         iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs), &p);
641         if (IS_ERR(iogroup))
642                 RETURN(PTR_ERR(iogroup));
643
644         err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
645         if (err != 0 || (write == 0 && exceed != 0))
646                 GOTO(out, err);
647
648         CDEBUG(D_INODE,
649                "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
650                write ? "Write" : "Read", (unsigned long)st->st_ino,
651                cnt, (__u64)pos, (__u64)st->st_size);
652
653         if (write && io->u.ci_wr.wr_append)
654                 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
655                 /* XXX What about if one write syscall writes at 2 different offsets? */
656
657         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
658                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
659                 long count = cio->cui_iov[iovidx].iov_len;
660
661                 if (!count)
662                         continue;
663                 if (cnt < count)
664                         count = cnt;
665                 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
666                         GOTO(out, err = -EFAULT);
667                 }
668
669                 if (io->ci_type == CIT_READ) {
670                         if (/* local_lock && */ pos >= st->st_size)
671                                 break;
672                 } else if (io->ci_type == CIT_WRITE) {
673                         if (pos >= lli->lli_maxbytes) {
674                                 GOTO(out, err = -EFBIG);
675                         }
676                         if (pos + count >= lli->lli_maxbytes)
677                                 count = lli->lli_maxbytes - pos;
678                 } else {
679                         LBUG();
680                 }
681
682                 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
683                 if (ret < 0) {
684                         GOTO(out, err = ret);
685                 } else {
686                         io->ci_nob += ret;
687                         pos += ret;
688                         cnt -= ret;
689                         if (io->ci_type == CIT_WRITE) {
690 //                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
691                                 if (pos > st->st_size)
692                                         st->st_size = pos;
693                         }
694                         if (!cnt)
695                                 break;
696                 }
697         }
698         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
699
700         if (!iogroup->lig_rc)
701                 session->lis_rwcount += iogroup->lig_rwcount;
702         else if (!session->lis_rc)
703                 session->lis_rc = iogroup->lig_rc;
704         err = 0;
705
706 out:
707         put_io_group(iogroup);
708         return err;
709 }
710
711 static const struct cl_io_operations ccc_io_ops = {
712         .op = {
713                 [CIT_READ] = {
714                         .cio_fini      = ccc_io_fini,
715                         .cio_lock      = slp_io_rw_lock,
716                         .cio_start     = slp_io_start,
717                         .cio_end       = ccc_io_end,
718                         .cio_advance   = ccc_io_advance
719                 },
720                 [CIT_WRITE] = {
721                         .cio_fini      = ccc_io_fini,
722                         .cio_lock      = slp_io_rw_lock,
723                         .cio_start     = slp_io_start,
724                         .cio_end       = ccc_io_end,
725                         .cio_advance   = ccc_io_advance
726                 },
727                 [CIT_SETATTR] = {
728                         .cio_fini       = ccc_io_fini,
729                         .cio_iter_init  = slp_io_setattr_iter_init,
730                         .cio_start      = slp_io_setattr_start
731                 },
732                 [CIT_MISC] = {
733                         .cio_fini   = ccc_io_fini
734                 }
735         }
736 };
737
738 static struct slp_io *cl2slp_io(const struct lu_env *env,
739                                 const struct cl_io_slice *slice)
740 {
741         /* We call it just for assertion here */
742         cl2ccc_io(env, slice);
743
744         return slp_env_io(env);
745 }
746
747 /*****************************************************************************
748  *
749  * Temporary prototype thing: mirror obd-devices into cl devices.
750  *
751  */
752
753 int cl_sb_init(struct llu_sb_info *sbi)
754 {
755         struct cl_device  *cl;
756         struct lu_env     *env;
757         int rc = 0;
758         int refcheck;
759
760         env = cl_env_get(&refcheck);
761         if (IS_ERR(env))
762                 RETURN(PTR_ERR(env));
763
764         cl = cl_type_setup(env, NULL, &slp_device_type,
765                            sbi->ll_dt_exp->exp_obd->obd_lu_dev);
766         if (IS_ERR(cl))
767                 GOTO(out, rc = PTR_ERR(cl));
768
769         sbi->ll_cl = cl;
770         sbi->ll_site = cl2lu_dev(cl)->ld_site;
771 out:
772         cl_env_put(env, &refcheck);
773         RETURN(rc);
774 }
775
776 int cl_sb_fini(struct llu_sb_info *sbi)
777 {
778         struct lu_env *env;
779         int refcheck;
780
781         ENTRY;
782
783         env = cl_env_get(&refcheck);
784         if (IS_ERR(env))
785                 RETURN(PTR_ERR(env));
786
787         if (sbi->ll_cl != NULL) {
788                 cl_stack_fini(env, sbi->ll_cl);
789                 sbi->ll_cl = NULL;
790                 sbi->ll_site = NULL;
791         }
792         cl_env_put(env, &refcheck);
793         /*
794          * If mount failed (sbi->ll_cl == NULL), and this there are no other
795          * mounts, stop device types manually (this usually happens
796          * automatically when last device is destroyed).
797          */
798         lu_types_stop();
799         cl_env_cache_purge(~0);
800         RETURN(0);
801 }