Whamcloud - gitweb
LU-1866 lfsck: enhance otable-based iteration
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
1 /*
2  *   Copyright (c) 2007 Cluster File Systems, Inc.
3  *   Author: Nikita Danilov <nikita@clusterfs.com>
4  *
5  *   This file is part of Lustre, http://www.lustre.org.
6  *
7  *   Lustre is free software; you can redistribute it and/or
8  *   modify it under the terms of version 2 of the GNU General Public
9  *   License as published by the Free Software Foundation.
10  *
11  *   Lustre is distributed in the hope that it will be useful,
12  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *   GNU General Public License for more details.
15  *
16  *   You should have received a copy of the GNU General Public License
17  *   along with Lustre; if not, write to the Free Software
18  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  *   Copyright (c) 2011, 2012, Intel Corporation.
21  */
22
23 #define DEBUG_SUBSYSTEM S_LLITE
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <sys/queue.h>
33 #ifndef __CYGWIN__
34 # include <sys/statvfs.h>
35 #else
36 # include <sys/statfs.h>
37 #endif
38
39 #include <liblustre.h>
40
41 #include <obd.h>
42 #include <obd_support.h>
43 #include <lustre_fid.h>
44 #include <lustre_lite.h>
45 #include <lustre_dlm.h>
46 #include <lustre_ver.h>
47 #include <lustre_mdc.h>
48 #include <cl_object.h>
49
50 #include "llite_lib.h"
51
52 /*
53  * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
54  * "llu_" prefix.
55  */
56
57 static int   slp_type_init     (struct lu_device_type *t);
58 static void  slp_type_fini     (struct lu_device_type *t);
59
60 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
61                          struct cl_page *page, cfs_page_t *vmpage);
62 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
63                                struct cl_attr *attr);
64
65 static struct lu_device  *slp_device_alloc(const struct lu_env *env,
66                                            struct lu_device_type *t,
67                                            struct lustre_cfg *cfg);
68
69 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
70                        struct cl_io *io);
71 static struct slp_io *cl2slp_io(const struct lu_env *env,
72                                 const struct cl_io_slice *slice);
73
74
75 static void llu_free_user_page(struct page *page);
76
77 static const struct lu_object_operations      slp_lu_obj_ops;
78 static const struct lu_device_operations      slp_lu_ops;
79 static const struct cl_device_operations      slp_cl_ops;
80 static const struct cl_io_operations          ccc_io_ops;
81 static const struct lu_device_type_operations slp_device_type_ops;
82              //struct lu_device_type            slp_device_type;
83 static const struct cl_page_operations        slp_page_ops;
84 static const struct cl_page_operations        slp_transient_page_ops;
85 static const struct cl_lock_operations        slp_lock_ops;
86
87
88 /*****************************************************************************
89  *
90  * Slp device and device type functions.
91  *
92  */
93
94 void *slp_session_key_init(const struct lu_context *ctx,
95                                   struct lu_context_key *key)
96 {
97         struct slp_session *session;
98
99         OBD_ALLOC_PTR(session);
100         if (session == NULL)
101                 session = ERR_PTR(-ENOMEM);
102         return session;
103 }
104
105 void slp_session_key_fini(const struct lu_context *ctx,
106                                  struct lu_context_key *key, void *data)
107 {
108         struct slp_session *session = data;
109         OBD_FREE_PTR(session);
110 }
111
112 struct lu_context_key slp_session_key = {
113         .lct_tags = LCT_SESSION,
114         .lct_init = slp_session_key_init,
115         .lct_fini = slp_session_key_fini
116 };
117
118 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
119 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
120
121 static struct lu_device *slp_device_alloc(const struct lu_env *env,
122                                           struct lu_device_type *t,
123                                           struct lustre_cfg *cfg)
124 {
125         return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
126 }
127
128 static int slp_lock_init(const struct lu_env *env,
129                          struct cl_object *obj, struct cl_lock *lock,
130                          const struct cl_io *io)
131 {
132         return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
133 }
134
135 static const struct cl_object_operations slp_ops = {
136         .coo_page_init = slp_page_init,
137         .coo_lock_init = slp_lock_init,
138         .coo_io_init   = slp_io_init,
139         .coo_attr_get  = slp_attr_get,
140         .coo_attr_set  = ccc_attr_set,
141         .coo_conf_set  = ccc_conf_set,
142         .coo_glimpse   = ccc_object_glimpse
143 };
144
145 static int slp_object_print(const struct lu_env *env, void *cookie,
146                             lu_printer_t p, const struct lu_object *o)
147 {
148         struct ccc_object *obj   = lu2ccc(o);
149         struct inode      *inode = obj->cob_inode;
150         struct intnl_stat *st = NULL;
151
152         if (inode)
153                 st = llu_i2stat(inode);
154
155         return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
156                     obj, inode,
157                     st ? (unsigned long)st->st_ino : 0UL,
158                     inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
159                     : 0);
160 }
161
162 static const struct lu_object_operations slp_lu_obj_ops = {
163         .loo_object_init      = ccc_object_init,
164         .loo_object_start     = NULL,
165         .loo_object_delete    = NULL,
166         .loo_object_release   = NULL,
167         .loo_object_free      = ccc_object_free,
168         .loo_object_print     = slp_object_print,
169         .loo_object_invariant = NULL
170 };
171
172 static struct lu_object *slp_object_alloc(const struct lu_env *env,
173                                           const struct lu_object_header *hdr,
174                                           struct lu_device *dev)
175 {
176         return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
177 }
178
179 static const struct lu_device_operations slp_lu_ops = {
180         .ldo_object_alloc      = slp_object_alloc
181 };
182
183 static const struct cl_device_operations slp_cl_ops = {
184         .cdo_req_init = ccc_req_init
185 };
186
187 static const struct lu_device_type_operations slp_device_type_ops = {
188         .ldto_init = slp_type_init,
189         .ldto_fini = slp_type_fini,
190
191         .ldto_start = slp_type_start,
192         .ldto_stop  = slp_type_stop,
193
194         .ldto_device_alloc = slp_device_alloc,
195         .ldto_device_free  = ccc_device_free,
196         .ldto_device_init  = ccc_device_init,
197         .ldto_device_fini  = ccc_device_fini
198 };
199
200 struct lu_device_type slp_device_type = {
201         .ldt_tags     = LU_DEVICE_CL,
202         .ldt_name     = LUSTRE_SLP_NAME,
203         .ldt_ops      = &slp_device_type_ops,
204         .ldt_ctx_tags = LCT_CL_THREAD
205 };
206
207 int slp_global_init(void)
208 {
209         int result;
210
211         result = ccc_global_init(&slp_device_type);
212         return result;
213 }
214
215 void slp_global_fini(void)
216 {
217         ccc_global_fini(&slp_device_type);
218 }
219
220 /*****************************************************************************
221  *
222  * Object operations.
223  *
224  */
225
226 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
227                         struct cl_page *page, cfs_page_t *vmpage)
228 {
229         struct ccc_page *cpg = cl_object_page_slice(obj, page);
230
231         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
232
233         cpg->cpg_page = vmpage;
234
235         if (page->cp_type == CPT_CACHEABLE) {
236                 LBUG();
237         } else {
238                 struct ccc_object *clobj = cl2ccc(obj);
239
240                 cl_page_slice_add(page, &cpg->cpg_cl, obj,
241                                 &slp_transient_page_ops);
242                 clobj->cob_transient_pages++;
243         }
244
245         return 0;
246 }
247
248 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
249                        struct cl_io *io)
250 {
251         struct ccc_io      *vio   = ccc_env_io(env);
252         int result = 0;
253
254         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
255
256         cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
257         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
258                 size_t count;
259
260                 count = io->u.ci_rw.crw_count;
261                 /* "If nbyte is 0, read() will return 0 and have no other
262                  *  results."  -- Single Unix Spec */
263                 if (count == 0)
264                         result = 1;
265                 else {
266                         vio->cui_tot_count = count;
267                         vio->cui_tot_nrsegs = 0;
268                 }
269
270         }
271         return result;
272 }
273
274 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
275                         struct cl_attr *attr)
276 {
277         struct inode *inode = ccc_object_inode(obj);
278         struct intnl_stat *st = llu_i2stat(inode);
279
280         attr->cat_size = st->st_size;
281         attr->cat_blocks = st->st_blocks;
282         attr->cat_mtime  = st->st_mtime;
283         attr->cat_atime  = st->st_atime;
284         attr->cat_ctime  = st->st_ctime;
285         /* KMS is not known by this layer */
286         return 0; /* layers below have to fill in the rest */
287 }
288
289 /*****************************************************************************
290  *
291  * Page operations.
292  *
293  */
294
295 static void slp_page_fini_common(struct ccc_page *cp)
296 {
297         cfs_page_t *vmpage = cp->cpg_page;
298
299         LASSERT(vmpage != NULL);
300         llu_free_user_page(vmpage);
301         OBD_FREE_PTR(cp);
302 }
303
304 static void slp_page_completion_common(const struct lu_env *env,
305                                        struct ccc_page *cp, int ioret)
306 {
307         LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
308 }
309
310 static void slp_page_completion_read(const struct lu_env *env,
311                                      const struct cl_page_slice *slice,
312                                      int ioret)
313 {
314         struct ccc_page *cp      = cl2ccc_page(slice);
315         ENTRY;
316
317         slp_page_completion_common(env, cp, ioret);
318
319         EXIT;
320 }
321
322 static void slp_page_completion_write_common(const struct lu_env *env,
323                                              const struct cl_page_slice *slice,
324                                              int ioret)
325 {
326         struct ccc_page *cp     = cl2ccc_page(slice);
327
328         if (ioret == 0) {
329                 cp->cpg_write_queued = 0;
330                 /*
331                  * Only ioret == 0, write succeed, then this page could be
332                  * deleted from the pending_writing count.
333                  */
334         }
335         slp_page_completion_common(env, cp, ioret);
336 }
337
338 static int slp_page_is_vmlocked(const struct lu_env *env,
339                                 const struct cl_page_slice *slice)
340 {
341         return -EBUSY;
342 }
343
344 static void slp_transient_page_fini(const struct lu_env *env,
345                                     struct cl_page_slice *slice)
346 {
347         struct ccc_page *cp = cl2ccc_page(slice);
348         struct cl_page *clp = slice->cpl_page;
349         struct ccc_object *clobj = cl2ccc(clp->cp_obj);
350
351         slp_page_fini_common(cp);
352         clobj->cob_transient_pages--;
353 }
354
355
356 static const struct cl_page_operations slp_transient_page_ops = {
357         .cpo_own           = ccc_transient_page_own,
358         .cpo_assume        = ccc_transient_page_assume,
359         .cpo_unassume      = ccc_transient_page_unassume,
360         .cpo_disown        = ccc_transient_page_disown,
361         .cpo_discard       = ccc_transient_page_discard,
362         .cpo_vmpage        = ccc_page_vmpage,
363         .cpo_is_vmlocked   = slp_page_is_vmlocked,
364         .cpo_fini          = slp_transient_page_fini,
365         .cpo_is_under_lock = ccc_page_is_under_lock,
366         .io = {
367                 [CRT_READ] = {
368                         .cpo_completion  = slp_page_completion_read,
369                 },
370                 [CRT_WRITE] = {
371                         .cpo_completion  = slp_page_completion_write_common,
372                 }
373         }
374 };
375
376 /*****************************************************************************
377  *
378  * Lock operations.
379  *
380  */
381
382 static int slp_lock_enqueue(const struct lu_env *env,
383                            const struct cl_lock_slice *slice,
384                            struct cl_io *unused, __u32 enqflags)
385 {
386         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
387
388         liblustre_wait_event(0);
389         return 0;
390 }
391
392 static const struct cl_lock_operations slp_lock_ops = {
393         .clo_delete    = ccc_lock_delete,
394         .clo_fini      = ccc_lock_fini,
395         .clo_enqueue   = slp_lock_enqueue,
396         .clo_wait      = ccc_lock_wait,
397         .clo_unuse     = ccc_lock_unuse,
398         .clo_fits_into = ccc_lock_fits_into,
399 };
400
401 /*****************************************************************************
402  *
403  * io operations.
404  *
405  */
406
407 static int slp_io_rw_lock(const struct lu_env *env,
408                           const struct cl_io_slice *ios)
409 {
410         struct ccc_io *cio = ccc_env_io(env);
411         struct cl_io *io   = ios->cis_io;
412         loff_t start;
413         loff_t end;
414
415         if (cl_io_is_append(io)) {
416                 start = 0;
417                 end   = OBD_OBJECT_EOF;
418         } else {
419                 start = io->u.ci_wr.wr.crw_pos;
420                 end   = start + io->u.ci_wr.wr.crw_count - 1;
421         }
422
423         ccc_io_update_iov(env, cio, io);
424
425         /*
426          * This acquires real DLM lock only in O_APPEND case, because of
427          * the io->ci_lockreq setting in llu_io_init().
428          */
429         LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
430         LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
431         return ccc_io_one_lock(env, io, 0,
432                                io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
433                                start, end);
434
435 }
436
437 static int slp_io_setattr_iter_init(const struct lu_env *env,
438                                     const struct cl_io_slice *ios)
439 {
440         return 0;
441 }
442
443 static int slp_io_setattr_start(const struct lu_env *env,
444                                 const struct cl_io_slice *ios)
445 {
446         return 0;
447 }
448
449 static struct page *llu_get_user_page(int index, void *addr, int offset,
450                                       int count)
451 {
452         struct page *page;
453
454         OBD_ALLOC_PTR(page);
455         if (!page)
456                 return NULL;
457         page->index = index;
458         page->addr = addr;
459         page->_offset = offset;
460         page->_count = count;
461
462         CFS_INIT_LIST_HEAD(&page->list);
463         CFS_INIT_LIST_HEAD(&page->_node);
464
465         return page;
466 }
467
468 static void llu_free_user_page(struct page *page)
469 {
470         OBD_FREE_PTR(page);
471 }
472
473
474 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
475                          struct llu_io_group *group,
476                          char *buf, size_t count, loff_t pos)
477 {
478         struct cl_object *obj = io->ci_obj;
479         struct inode *inode = ccc_object_inode(obj);
480         struct intnl_stat *st = llu_i2stat(inode);
481         struct obd_export *exp = llu_i2obdexp(inode);
482         struct page *page;
483         int  rc = 0, ret_bytes = 0;
484         struct cl_page *clp;
485         struct cl_2queue *queue;
486         ENTRY;
487
488         if (!exp)
489                 RETURN(-EINVAL);
490
491         queue = &io->ci_queue;
492         cl_2queue_init(queue);
493
494
495         /* prepare the pages array */
496         do {
497                 unsigned long index, offset, bytes;
498
499                 offset = (pos & ~CFS_PAGE_MASK);
500                 index = pos >> CFS_PAGE_SHIFT;
501                 bytes = CFS_PAGE_SIZE - offset;
502                 if (bytes > count)
503                         bytes = count;
504
505                 /* prevent read beyond file range */
506                 if (/* local_lock && */
507                     io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
508                         if (pos >= st->st_size)
509                                 break;
510                         bytes = st->st_size - pos;
511                 }
512
513                 /* prepare page for this index */
514                 page = llu_get_user_page(index, buf - offset, offset, bytes);
515                 if (!page) {
516                         rc = -ENOMEM;
517                         break;
518                 }
519
520                 clp = cl_page_find(env, obj,
521                                    cl_index(obj, pos),
522                                    page, CPT_TRANSIENT);
523
524                 if (IS_ERR(clp)) {
525                         rc = PTR_ERR(clp);
526                         break;
527                 }
528
529                 rc = cl_page_own(env, io, clp);
530                 if (rc) {
531                         LASSERT(clp->cp_state == CPS_FREEING);
532                         cl_page_put(env, clp);
533                         break;
534                 }
535
536                 cl_2queue_add(queue, clp);
537
538                 /* drop the reference count for cl_page_find, so that the page
539                  * will be freed in cl_2queue_fini. */
540                 cl_page_put(env, clp);
541
542                 cl_page_clip(env, clp, offset, offset+bytes);
543
544                 count -= bytes;
545                 pos += bytes;
546                 buf += bytes;
547
548                 group->lig_rwcount += bytes;
549                 ret_bytes += bytes;
550                 page++;
551         } while (count);
552
553         if (rc == 0) {
554                 enum cl_req_type iot;
555                 iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
556                 rc = cl_io_submit_sync(env, io, iot, queue, 0);
557         }
558
559         group->lig_rc = rc;
560
561         cl_2queue_discard(env, io, queue);
562         cl_2queue_disown(env, io, queue);
563         cl_2queue_fini(env, queue);
564
565         RETURN(ret_bytes);
566 }
567
568 static
569 struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
570                                    struct lustre_rw_params *params)
571 {
572         struct llu_io_group *group;
573
574         OBD_ALLOC_PTR(group);
575         if (!group)
576                 return ERR_PTR(-ENOMEM);
577
578         group->lig_params = params;
579
580         return group;
581 }
582
583 static int max_io_pages(ssize_t len, int iovlen)
584 {
585         return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
586 }
587
588 void put_io_group(struct llu_io_group *group)
589 {
590         OBD_FREE_PTR(group);
591 }
592
593 /**
594  * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
595  */
596 int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
597 {
598         return 1;
599 }
600
601 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
602 {
603         struct ccc_io     *cio   = cl2ccc_io(env, ios);
604         struct cl_io      *io    = ios->cis_io;
605         struct cl_object  *obj   = io->ci_obj;
606         struct inode      *inode = ccc_object_inode(obj);
607         int    err, ret;
608         loff_t pos;
609         long   cnt;
610         struct llu_io_group *iogroup;
611         struct lustre_rw_params p = {0};
612         int iovidx;
613         struct intnl_stat *st = llu_i2stat(inode);
614         struct llu_inode_info *lli = llu_i2info(inode);
615         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
616         int write = io->ci_type == CIT_WRITE;
617         int exceed = 0;
618
619         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
620
621         if (write) {
622                 pos = io->u.ci_wr.wr.crw_pos;
623                 cnt = io->u.ci_wr.wr.crw_count;
624         } else {
625                 pos = io->u.ci_rd.rd.crw_pos;
626                 cnt = io->u.ci_rd.rd.crw_count;
627         }
628         if (io->u.ci_wr.wr_append) {
629                 p.lrp_lock_mode = LCK_PW;
630         } else {
631                 p.lrp_brw_flags = OBD_BRW_SRVLOCK;
632                 p.lrp_lock_mode = LCK_NL;
633         }
634
635         iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs), &p);
636         if (IS_ERR(iogroup))
637                 RETURN(PTR_ERR(iogroup));
638
639         err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
640         if (err != 0 || (write == 0 && exceed != 0))
641                 GOTO(out, err);
642
643         CDEBUG(D_INODE,
644                "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
645                write ? "Write" : "Read", (unsigned long)st->st_ino,
646                cnt, (__u64)pos, (__u64)st->st_size);
647
648         if (write && io->u.ci_wr.wr_append)
649                 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
650                 /* XXX What about if one write syscall writes at 2 different offsets? */
651
652         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
653                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
654                 long count = cio->cui_iov[iovidx].iov_len;
655
656                 if (!count)
657                         continue;
658                 if (cnt < count)
659                         count = cnt;
660                 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
661                         GOTO(out, err = -EFAULT);
662                 }
663
664                 if (io->ci_type == CIT_READ) {
665                         if (/* local_lock && */ pos >= st->st_size)
666                                 break;
667                 } else if (io->ci_type == CIT_WRITE) {
668                         if (pos >= lli->lli_maxbytes) {
669                                 GOTO(out, err = -EFBIG);
670                         }
671                         if (pos + count >= lli->lli_maxbytes)
672                                 count = lli->lli_maxbytes - pos;
673                 } else {
674                         LBUG();
675                 }
676
677                 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
678                 if (ret < 0) {
679                         GOTO(out, err = ret);
680                 } else {
681                         io->ci_nob += ret;
682                         pos += ret;
683                         cnt -= ret;
684                         if (io->ci_type == CIT_WRITE) {
685 //                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
686                                 if (pos > st->st_size)
687                                         st->st_size = pos;
688                         }
689                         if (!cnt)
690                                 break;
691                 }
692         }
693         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
694
695         if (!iogroup->lig_rc)
696                 session->lis_rwcount += iogroup->lig_rwcount;
697         else if (!session->lis_rc)
698                 session->lis_rc = iogroup->lig_rc;
699         err = 0;
700
701 out:
702         put_io_group(iogroup);
703         return err;
704 }
705
706 static const struct cl_io_operations ccc_io_ops = {
707         .op = {
708                 [CIT_READ] = {
709                         .cio_fini      = ccc_io_fini,
710                         .cio_lock      = slp_io_rw_lock,
711                         .cio_start     = slp_io_start,
712                         .cio_end       = ccc_io_end,
713                         .cio_advance   = ccc_io_advance
714                 },
715                 [CIT_WRITE] = {
716                         .cio_fini      = ccc_io_fini,
717                         .cio_lock      = slp_io_rw_lock,
718                         .cio_start     = slp_io_start,
719                         .cio_end       = ccc_io_end,
720                         .cio_advance   = ccc_io_advance
721                 },
722                 [CIT_SETATTR] = {
723                         .cio_fini       = ccc_io_fini,
724                         .cio_iter_init  = slp_io_setattr_iter_init,
725                         .cio_start      = slp_io_setattr_start
726                 },
727                 [CIT_MISC] = {
728                         .cio_fini   = ccc_io_fini
729                 }
730         }
731 };
732
733 static struct slp_io *cl2slp_io(const struct lu_env *env,
734                                 const struct cl_io_slice *slice)
735 {
736         /* We call it just for assertion here */
737         cl2ccc_io(env, slice);
738
739         return slp_env_io(env);
740 }
741
742 /*****************************************************************************
743  *
744  * Temporary prototype thing: mirror obd-devices into cl devices.
745  *
746  */
747
748 int cl_sb_init(struct llu_sb_info *sbi)
749 {
750         struct cl_device  *cl;
751         struct lu_env     *env;
752         int rc = 0;
753         int refcheck;
754
755         env = cl_env_get(&refcheck);
756         if (IS_ERR(env))
757                 RETURN(PTR_ERR(env));
758
759         cl = cl_type_setup(env, NULL, &slp_device_type,
760                            sbi->ll_dt_exp->exp_obd->obd_lu_dev);
761         if (IS_ERR(cl))
762                 GOTO(out, rc = PTR_ERR(cl));
763
764         sbi->ll_cl = cl;
765         sbi->ll_site = cl2lu_dev(cl)->ld_site;
766 out:
767         cl_env_put(env, &refcheck);
768         RETURN(rc);
769 }
770
771 int cl_sb_fini(struct llu_sb_info *sbi)
772 {
773         struct lu_env *env;
774         int refcheck;
775
776         ENTRY;
777
778         env = cl_env_get(&refcheck);
779         if (IS_ERR(env))
780                 RETURN(PTR_ERR(env));
781
782         if (sbi->ll_cl != NULL) {
783                 cl_stack_fini(env, sbi->ll_cl);
784                 sbi->ll_cl = NULL;
785                 sbi->ll_site = NULL;
786         }
787         cl_env_put(env, &refcheck);
788         /*
789          * If mount failed (sbi->ll_cl == NULL), and this there are no other
790          * mounts, stop device types manually (this usually happens
791          * automatically when last device is destroyed).
792          */
793         lu_types_stop();
794         cl_env_cache_purge(~0);
795         RETURN(0);
796 }