Whamcloud - gitweb
b=19427 correct lmm_object_id and reserve fids for fid-on-OST.
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *   Copyright (c) 2007 Cluster File Systems, Inc.
5  *   Author: Nikita Danilov <nikita@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_LLITE
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <sys/queue.h>
33 #ifndef __CYGWIN__
34 # include <sys/statvfs.h>
35 #else
36 # include <sys/statfs.h>
37 #endif
38
39 #include <liblustre.h>
40
41 #include <obd.h>
42 #include <obd_support.h>
43 #include <lustre_fid.h>
44 #include <lustre_lite.h>
45 #include <lustre_dlm.h>
46 #include <lustre_ver.h>
47 #include <lustre_mdc.h>
48 #include <cl_object.h>
49
50 #include "llite_lib.h"
51
52 /*
53  * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
54  * "llu_" prefix.
55  */
56
57 static int   slp_type_init     (struct lu_device_type *t);
58 static void  slp_type_fini     (struct lu_device_type *t);
59
60 static struct cl_page * slp_page_init(const struct lu_env *env,
61                                      struct cl_object *obj,
62                                      struct cl_page *page, cfs_page_t *vmpage);
63 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
64                                struct cl_attr *attr);
65
66 static struct lu_device  *slp_device_alloc(const struct lu_env *env,
67                                            struct lu_device_type *t,
68                                            struct lustre_cfg *cfg);
69
70 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
71                        struct cl_io *io);
72 static struct slp_io *cl2slp_io(const struct lu_env *env,
73                                 const struct cl_io_slice *slice);
74
75
76 static void llu_free_user_page(struct page *page);
77
78 static const struct lu_object_operations      slp_lu_obj_ops;
79 static const struct lu_device_operations      slp_lu_ops;
80 static const struct cl_device_operations      slp_cl_ops;
81 static const struct cl_io_operations          ccc_io_ops;
82 static const struct lu_device_type_operations slp_device_type_ops;
83              //struct lu_device_type            slp_device_type;
84 static const struct cl_page_operations        slp_page_ops;
85 static const struct cl_page_operations        slp_transient_page_ops;
86 static const struct cl_lock_operations        slp_lock_ops;
87
88
89 /*****************************************************************************
90  *
91  * Slp device and device type functions.
92  *
93  */
94
95 void *slp_session_key_init(const struct lu_context *ctx,
96                                   struct lu_context_key *key)
97 {
98         struct slp_session *session;
99
100         OBD_ALLOC_PTR(session);
101         if (session == NULL)
102                 session = ERR_PTR(-ENOMEM);
103         return session;
104 }
105
106 void slp_session_key_fini(const struct lu_context *ctx,
107                                  struct lu_context_key *key, void *data)
108 {
109         struct slp_session *session = data;
110         OBD_FREE_PTR(session);
111 }
112
113 struct lu_context_key slp_session_key = {
114         .lct_tags = LCT_SESSION,
115         .lct_init = slp_session_key_init,
116         .lct_fini = slp_session_key_fini
117 };
118
119 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
120 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
121
122 static struct lu_device *slp_device_alloc(const struct lu_env *env,
123                                           struct lu_device_type *t,
124                                           struct lustre_cfg *cfg)
125 {
126         return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
127 }
128
129 static int slp_lock_init(const struct lu_env *env,
130                          struct cl_object *obj, struct cl_lock *lock,
131                          const struct cl_io *io)
132 {
133         return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
134 }
135
136 static const struct cl_object_operations slp_ops = {
137         .coo_page_init = slp_page_init,
138         .coo_lock_init = slp_lock_init,
139         .coo_io_init   = slp_io_init,
140         .coo_attr_get  = slp_attr_get,
141         .coo_attr_set  = ccc_attr_set,
142         .coo_conf_set  = ccc_conf_set,
143         .coo_glimpse   = ccc_object_glimpse
144 };
145
146 static int slp_object_print(const struct lu_env *env, void *cookie,
147                             lu_printer_t p, const struct lu_object *o)
148 {
149         struct ccc_object *obj   = lu2ccc(o);
150         struct inode      *inode = obj->cob_inode;
151         struct intnl_stat *st = NULL;
152
153         if (inode)
154                 st = llu_i2stat(inode);
155
156         return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
157                     obj, inode,
158                     st ? (unsigned long)st->st_ino : 0UL,
159                     inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
160                     : 0);
161 }
162
163 static const struct lu_object_operations slp_lu_obj_ops = {
164         .loo_object_init      = ccc_object_init,
165         .loo_object_start     = NULL,
166         .loo_object_delete    = NULL,
167         .loo_object_release   = NULL,
168         .loo_object_free      = ccc_object_free,
169         .loo_object_print     = slp_object_print,
170         .loo_object_invariant = NULL
171 };
172
173 static struct lu_object *slp_object_alloc(const struct lu_env *env,
174                                           const struct lu_object_header *hdr,
175                                           struct lu_device *dev)
176 {
177         return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
178 }
179
180 static const struct lu_device_operations slp_lu_ops = {
181         .ldo_object_alloc      = slp_object_alloc
182 };
183
184 static const struct cl_device_operations slp_cl_ops = {
185         .cdo_req_init = ccc_req_init
186 };
187
188 static const struct lu_device_type_operations slp_device_type_ops = {
189         .ldto_init = slp_type_init,
190         .ldto_fini = slp_type_fini,
191
192         .ldto_start = slp_type_start,
193         .ldto_stop  = slp_type_stop,
194
195         .ldto_device_alloc = slp_device_alloc,
196         .ldto_device_free  = ccc_device_free,
197         .ldto_device_init  = ccc_device_init,
198         .ldto_device_fini  = ccc_device_fini
199 };
200
201 struct lu_device_type slp_device_type = {
202         .ldt_tags     = LU_DEVICE_CL,
203         .ldt_name     = LUSTRE_SLP_NAME,
204         .ldt_ops      = &slp_device_type_ops,
205         .ldt_ctx_tags = LCT_CL_THREAD
206 };
207
208 int slp_global_init(void)
209 {
210         int result;
211
212         result = ccc_global_init(&slp_device_type);
213         return result;
214 }
215
216 void slp_global_fini(void)
217 {
218         ccc_global_fini(&slp_device_type);
219 }
220
221 /*****************************************************************************
222  *
223  * Object operations.
224  *
225  */
226
227 static struct cl_page *slp_page_init(const struct lu_env *env,
228                                      struct cl_object *obj,
229                                      struct cl_page *page, cfs_page_t *vmpage)
230 {
231         struct ccc_page *cpg;
232         int result;
233
234         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
235
236         OBD_ALLOC_PTR(cpg);
237         if (cpg != NULL) {
238                 cpg->cpg_page = vmpage;
239
240                 if (page->cp_type == CPT_CACHEABLE) {
241                         LBUG();
242                 } else {
243                         struct ccc_object *clobj = cl2ccc(obj);
244
245                         cl_page_slice_add(page, &cpg->cpg_cl, obj,
246                                           &slp_transient_page_ops);
247                         clobj->cob_transient_pages++;
248                 }
249                 result = 0;
250         } else
251                 result = -ENOMEM;
252         return ERR_PTR(result);
253 }
254
255 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
256                        struct cl_io *io)
257 {
258         struct ccc_io      *vio   = ccc_env_io(env);
259         int result = 0;
260
261         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
262
263         cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
264         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
265                 size_t count;
266
267                 count = io->u.ci_rw.crw_count;
268                 /* "If nbyte is 0, read() will return 0 and have no other
269                  *  results."  -- Single Unix Spec */
270                 if (count == 0)
271                         result = 1;
272                 else {
273                         vio->cui_tot_count = count;
274                         vio->cui_tot_nrsegs = 0;
275                 }
276
277         }
278         return result;
279 }
280
281 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
282                         struct cl_attr *attr)
283 {
284         struct inode *inode = ccc_object_inode(obj);
285         struct intnl_stat *st = llu_i2stat(inode);
286
287         attr->cat_size = st->st_size;
288         attr->cat_blocks = st->st_blocks;
289         attr->cat_mtime  = st->st_mtime;
290         attr->cat_atime  = st->st_atime;
291         attr->cat_ctime  = st->st_ctime;
292         /* KMS is not known by this layer */
293         return 0; /* layers below have to fill in the rest */
294 }
295
296 /*****************************************************************************
297  *
298  * Page operations.
299  *
300  */
301
302 static void slp_page_fini_common(struct ccc_page *cp)
303 {
304         cfs_page_t *vmpage = cp->cpg_page;
305
306         LASSERT(vmpage != NULL);
307         llu_free_user_page(vmpage);
308         OBD_FREE_PTR(cp);
309 }
310
311 static void slp_page_completion_common(const struct lu_env *env,
312                                        struct ccc_page *cp, int ioret)
313 {
314         LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
315 }
316
317 static void slp_page_completion_read(const struct lu_env *env,
318                                      const struct cl_page_slice *slice,
319                                      int ioret)
320 {
321         struct ccc_page *cp      = cl2ccc_page(slice);
322         ENTRY;
323
324         slp_page_completion_common(env, cp, ioret);
325
326         EXIT;
327 }
328
329 static void slp_page_completion_write_common(const struct lu_env *env,
330                                              const struct cl_page_slice *slice,
331                                              int ioret)
332 {
333         struct ccc_page *cp     = cl2ccc_page(slice);
334
335         if (ioret == 0) {
336                 cp->cpg_write_queued = 0;
337                 /*
338                  * Only ioret == 0, write succeed, then this page could be
339                  * deleted from the pending_writing count.
340                  */
341         }
342         slp_page_completion_common(env, cp, ioret);
343 }
344
345 static int slp_page_is_vmlocked(const struct lu_env *env,
346                                 const struct cl_page_slice *slice)
347 {
348         return -EBUSY;
349 }
350
351 static void slp_transient_page_fini(const struct lu_env *env,
352                                     struct cl_page_slice *slice)
353 {
354         struct ccc_page *cp = cl2ccc_page(slice);
355         struct cl_page *clp = slice->cpl_page;
356         struct ccc_object *clobj = cl2ccc(clp->cp_obj);
357
358         slp_page_fini_common(cp);
359         clobj->cob_transient_pages--;
360 }
361
362
363 static const struct cl_page_operations slp_transient_page_ops = {
364         .cpo_own           = ccc_transient_page_own,
365         .cpo_assume        = ccc_transient_page_assume,
366         .cpo_unassume      = ccc_transient_page_unassume,
367         .cpo_disown        = ccc_transient_page_disown,
368         .cpo_discard       = ccc_transient_page_discard,
369         .cpo_vmpage        = ccc_page_vmpage,
370         .cpo_is_vmlocked   = slp_page_is_vmlocked,
371         .cpo_fini          = slp_transient_page_fini,
372         .cpo_is_under_lock = ccc_page_is_under_lock,
373         .io = {
374                 [CRT_READ] = {
375                         .cpo_completion  = slp_page_completion_read,
376                 },
377                 [CRT_WRITE] = {
378                         .cpo_completion  = slp_page_completion_write_common,
379                 }
380         }
381 };
382
383 /*****************************************************************************
384  *
385  * Lock operations.
386  *
387  */
388
389 static int slp_lock_enqueue(const struct lu_env *env,
390                            const struct cl_lock_slice *slice,
391                            struct cl_io *unused, __u32 enqflags)
392 {
393         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
394
395         liblustre_wait_event(0);
396         return 0;
397 }
398
399 static const struct cl_lock_operations slp_lock_ops = {
400         .clo_delete    = ccc_lock_delete,
401         .clo_fini      = ccc_lock_fini,
402         .clo_enqueue   = slp_lock_enqueue,
403         .clo_wait      = ccc_lock_wait,
404         .clo_unuse     = ccc_lock_unuse,
405         .clo_fits_into = ccc_lock_fits_into,
406 };
407
408 /*****************************************************************************
409  *
410  * io operations.
411  *
412  */
413
414 static int slp_io_rw_lock(const struct lu_env *env,
415                           const struct cl_io_slice *ios)
416 {
417         struct ccc_io *cio = ccc_env_io(env);
418         struct cl_io *io   = ios->cis_io;
419         loff_t start;
420         loff_t end;
421
422         if (cl_io_is_append(io)) {
423                 start = 0;
424                 end   = OBD_OBJECT_EOF;
425         } else {
426                 start = io->u.ci_wr.wr.crw_pos;
427                 end   = start + io->u.ci_wr.wr.crw_count - 1;
428         }
429
430         ccc_io_update_iov(env, cio, io);
431
432         /*
433          * This acquires real DLM lock only in O_APPEND case, because of
434          * the io->ci_lockreq setting in llu_io_init().
435          */
436         LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
437         LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
438         return ccc_io_one_lock(env, io, 0,
439                                io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
440                                start, end);
441
442 }
443
444 static int slp_io_setattr_iter_init(const struct lu_env *env,
445                                     const struct cl_io_slice *ios)
446 {
447         return 0;
448 }
449
450 static int slp_io_setattr_start(const struct lu_env *env,
451                                 const struct cl_io_slice *ios)
452 {
453         return 0;
454 }
455
456 static struct page *llu_get_user_page(int index, void *addr, int offset,
457                                       int count)
458 {
459         struct page *page;
460
461         OBD_ALLOC_PTR(page);
462         if (!page)
463                 return NULL;
464         page->index = index;
465         page->addr = addr;
466         page->_offset = offset;
467         page->_count = count;
468
469         CFS_INIT_LIST_HEAD(&page->list);
470         CFS_INIT_LIST_HEAD(&page->_node);
471
472         return page;
473 }
474
475 static void llu_free_user_page(struct page *page)
476 {
477         OBD_FREE_PTR(page);
478 }
479
480
481 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
482                          struct llu_io_group *group,
483                          char *buf, size_t count, loff_t pos)
484 {
485         struct cl_object *obj = io->ci_obj;
486         struct inode *inode = ccc_object_inode(obj);
487         struct intnl_stat *st = llu_i2stat(inode);
488         struct obd_export *exp = llu_i2obdexp(inode);
489         struct page *page;
490         int  rc = 0, ret_bytes = 0;
491         int local_lock;
492         struct cl_page *clp;
493         struct cl_2queue *queue;
494         ENTRY;
495
496         if (!exp)
497                 RETURN(-EINVAL);
498
499         local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
500
501         queue = &io->ci_queue;
502         cl_2queue_init(queue);
503
504
505         /* prepare the pages array */
506         do {
507                 unsigned long index, offset, bytes;
508
509                 offset = (pos & ~CFS_PAGE_MASK);
510                 index = pos >> CFS_PAGE_SHIFT;
511                 bytes = CFS_PAGE_SIZE - offset;
512                 if (bytes > count)
513                         bytes = count;
514
515                 /* prevent read beyond file range */
516                 if (/* local_lock && */
517                     io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
518                         if (pos >= st->st_size)
519                                 break;
520                         bytes = st->st_size - pos;
521                 }
522
523                 /* prepare page for this index */
524                 page = llu_get_user_page(index, buf - offset, offset, bytes);
525                 if (!page) {
526                         rc = -ENOMEM;
527                         break;
528                 }
529
530                 clp = cl_page_find(env, obj,
531                                    cl_index(obj, pos),
532                                    page, CPT_TRANSIENT);
533
534                 if (IS_ERR(clp)) {
535                         rc = PTR_ERR(clp);
536                         break;
537                 }
538
539                 rc = cl_page_own(env, io, clp);
540                 if (rc) {
541                         LASSERT(clp->cp_state == CPS_FREEING);
542                         cl_page_put(env, clp);
543                         break;
544                 }
545
546                 cl_2queue_add(queue, clp);
547
548                 /* drop the reference count for cl_page_find, so that the page
549                  * will be freed in cl_2queue_fini. */
550                 cl_page_put(env, clp);
551
552                 cl_page_clip(env, clp, offset, offset+bytes);
553
554                 count -= bytes;
555                 pos += bytes;
556                 buf += bytes;
557
558                 group->lig_rwcount += bytes;
559                 ret_bytes += bytes;
560                 page++;
561         } while (count);
562
563         if (rc == 0) {
564                 enum cl_req_type iot;
565                 iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
566                 rc = cl_io_submit_sync(env, io, iot, queue, CRP_NORMAL, 0);
567         }
568
569         group->lig_rc = rc;
570
571         cl_2queue_discard(env, io, queue);
572         cl_2queue_disown(env, io, queue);
573         cl_2queue_fini(env, queue);
574
575         RETURN(ret_bytes);
576 }
577
578 static
579 struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
580                                    struct lustre_rw_params *params)
581 {
582         struct llu_io_group *group;
583
584         OBD_ALLOC_PTR(group);
585         if (!group)
586                 return ERR_PTR(-ENOMEM);
587
588         group->lig_params = params;
589
590         return group;
591 }
592
593 static int max_io_pages(ssize_t len, int iovlen)
594 {
595         return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
596 }
597
598 void put_io_group(struct llu_io_group *group)
599 {
600         OBD_FREE_PTR(group);
601 }
602
603 /**
604  * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
605  */
606 int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
607 {
608         return 1;
609 }
610
611 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
612 {
613         struct ccc_io     *cio   = cl2ccc_io(env, ios);
614         struct cl_io      *io    = ios->cis_io;
615         struct cl_object  *obj   = io->ci_obj;
616         struct inode      *inode = ccc_object_inode(obj);
617         int    err, ret;
618         loff_t pos;
619         long   cnt;
620         struct llu_io_group *iogroup;
621         struct lustre_rw_params p = {0};
622         int iovidx;
623         struct intnl_stat *st = llu_i2stat(inode);
624         struct llu_inode_info *lli = llu_i2info(inode);
625         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
626         int write = io->ci_type == CIT_WRITE;
627         int exceed = 0;
628
629         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
630
631         if (write) {
632                 pos = io->u.ci_wr.wr.crw_pos;
633                 cnt = io->u.ci_wr.wr.crw_count;
634         } else {
635                 pos = io->u.ci_rd.rd.crw_pos;
636                 cnt = io->u.ci_rd.rd.crw_count;
637         }
638         if (io->u.ci_wr.wr_append) {
639                 p.lrp_lock_mode = LCK_PW;
640         } else {
641                 p.lrp_brw_flags = OBD_BRW_SRVLOCK;
642                 p.lrp_lock_mode = LCK_NL;
643         }
644
645         iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs), &p);
646         if (IS_ERR(iogroup))
647                 RETURN(PTR_ERR(iogroup));
648
649         err = ccc_prep_size(env, obj, io, pos, cnt, 0, &exceed);
650         if (err != 0 || (write == 0 && exceed != 0))
651                 GOTO(out, err);
652
653         CDEBUG(D_INODE,
654                "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
655                write ? "Write" : "Read", (unsigned long)st->st_ino,
656                cnt, (__u64)pos, (__u64)st->st_size);
657
658         if (write && io->u.ci_wr.wr_append)
659                 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
660                 /* XXX What about if one write syscall writes at 2 different offsets? */
661
662         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
663                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
664                 long count = cio->cui_iov[iovidx].iov_len;
665
666                 if (!count)
667                         continue;
668                 if (cnt < count)
669                         count = cnt;
670                 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
671                         GOTO(out, err = -EFAULT);
672                 }
673
674                 if (io->ci_type == CIT_READ) {
675                         if (/* local_lock && */ pos >= st->st_size)
676                                 break;
677                 } else if (io->ci_type == CIT_WRITE) {
678                         if (pos >= lli->lli_maxbytes) {
679                                 GOTO(out, err = -EFBIG);
680                         }
681                         if (pos + count >= lli->lli_maxbytes)
682                                 count = lli->lli_maxbytes - pos;
683                 } else {
684                         LBUG();
685                 }
686
687                 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
688                 if (ret < 0) {
689                         GOTO(out, err = ret);
690                 } else {
691                         io->ci_nob += ret;
692                         pos += ret;
693                         cnt -= ret;
694                         if (io->ci_type == CIT_WRITE) {
695 //                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
696                                 if (pos > st->st_size)
697                                         st->st_size = pos;
698                         }
699                         if (!cnt)
700                                 break;
701                 }
702         }
703         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
704
705         if (!iogroup->lig_rc)
706                 session->lis_rwcount += iogroup->lig_rwcount;
707         else if (!session->lis_rc)
708                 session->lis_rc = iogroup->lig_rc;
709         err = 0;
710
711 out:
712         put_io_group(iogroup);
713         return err;
714 }
715
716 static const struct cl_io_operations ccc_io_ops = {
717         .op = {
718                 [CIT_READ] = {
719                         .cio_fini      = ccc_io_fini,
720                         .cio_lock      = slp_io_rw_lock,
721                         .cio_start     = slp_io_start,
722                         .cio_end       = ccc_io_end,
723                         .cio_advance   = ccc_io_advance
724                 },
725                 [CIT_WRITE] = {
726                         .cio_fini      = ccc_io_fini,
727                         .cio_lock      = slp_io_rw_lock,
728                         .cio_start     = slp_io_start,
729                         .cio_end       = ccc_io_end,
730                         .cio_advance   = ccc_io_advance
731                 },
732                 [CIT_SETATTR] = {
733                         .cio_fini       = ccc_io_fini,
734                         .cio_iter_init  = slp_io_setattr_iter_init,
735                         .cio_start      = slp_io_setattr_start
736                 },
737                 [CIT_MISC] = {
738                         .cio_fini   = ccc_io_fini
739                 }
740         }
741 };
742
743 static struct slp_io *cl2slp_io(const struct lu_env *env,
744                                 const struct cl_io_slice *slice)
745 {
746         /* We call it just for assertion here */
747         cl2ccc_io(env, slice);
748
749         return slp_env_io(env);
750 }
751
752 /*****************************************************************************
753  *
754  * Temporary prototype thing: mirror obd-devices into cl devices.
755  *
756  */
757
758 int cl_sb_init(struct llu_sb_info *sbi)
759 {
760         struct cl_device  *cl;
761         struct lu_env     *env;
762         int rc = 0;
763         int refcheck;
764
765         env = cl_env_get(&refcheck);
766         if (IS_ERR(env))
767                 RETURN(PTR_ERR(env));
768
769         cl = cl_type_setup(env, NULL, &slp_device_type,
770                            sbi->ll_dt_exp->exp_obd->obd_lu_dev);
771         if (IS_ERR(cl))
772                 GOTO(out, rc = PTR_ERR(cl));
773
774         sbi->ll_cl = cl;
775         sbi->ll_site = cl2lu_dev(cl)->ld_site;
776 out:
777         cl_env_put(env, &refcheck);
778         RETURN(rc);
779 }
780
781 int cl_sb_fini(struct llu_sb_info *sbi)
782 {
783         struct lu_env *env;
784         int refcheck;
785
786         ENTRY;
787
788         env = cl_env_get(&refcheck);
789         if (IS_ERR(env))
790                 RETURN(PTR_ERR(env));
791
792         if (sbi->ll_cl != NULL) {
793                 cl_stack_fini(env, sbi->ll_cl);
794                 sbi->ll_cl = NULL;
795                 sbi->ll_site = NULL;
796         }
797         cl_env_put(env, &refcheck);
798         /*
799          * If mount failed (sbi->ll_cl == NULL), and this there are no other
800          * mounts, stop device types manually (this usually happens
801          * automatically when last device is destroyed).
802          */
803         lu_types_stop();
804         cl_env_cache_purge(~0);
805         RETURN(0);
806 }