Whamcloud - gitweb
2557dbc23291e6b1da4987ada2216362ab3fda61
[fs/lustre-release.git] / lustre / liblustre / llite_cl.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *   Copyright (c) 2007 Cluster File Systems, Inc.
5  *   Author: Nikita Danilov <nikita@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #define DEBUG_SUBSYSTEM S_LLITE
24
25 #include <stdlib.h>
26 #include <string.h>
27 #include <assert.h>
28 #include <time.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <sys/queue.h>
33 #ifndef __CYGWIN__
34 # include <sys/statvfs.h>
35 #else
36 # include <sys/statfs.h>
37 #endif
38
39 #include <sysio.h>
40 #ifdef HAVE_XTIO_H
41 #include <xtio.h>
42 #endif
43 #include <fs.h>
44 #include <mount.h>
45 #include <inode.h>
46 #ifdef HAVE_FILE_H
47 #include <file.h>
48 #endif
49 #include <liblustre.h>
50
51 #include <obd.h>
52 #include <obd_support.h>
53 #include <lustre_fid.h>
54 #include <lustre_lite.h>
55 #include <lustre_dlm.h>
56 #include <lustre_ver.h>
57 #include <lustre_mdc.h>
58 #include <cl_object.h>
59
60 #include "llite_lib.h"
61
62 /*
63  * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
64  * "llu_" prefix.
65  */
66
67 static int   slp_type_init     (struct lu_device_type *t);
68 static void  slp_type_fini     (struct lu_device_type *t);
69
70 static struct cl_page * slp_page_init(const struct lu_env *env,
71                                      struct cl_object *obj,
72                                      struct cl_page *page, cfs_page_t *vmpage);
73 static int   slp_attr_get     (const struct lu_env *env, struct cl_object *obj,
74                                struct cl_attr *attr);
75
76 static struct lu_device  *slp_device_alloc(const struct lu_env *env,
77                                            struct lu_device_type *t,
78                                            struct lustre_cfg *cfg);
79
80 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
81                        struct cl_io *io);
82 static struct slp_io *cl2slp_io(const struct lu_env *env,
83                                 const struct cl_io_slice *slice);
84
85
86 static void llu_free_user_page(struct page *page);
87
88 static const struct lu_object_operations      slp_lu_obj_ops;
89 static const struct lu_device_operations      slp_lu_ops;
90 static const struct cl_device_operations      slp_cl_ops;
91 static const struct cl_io_operations          ccc_io_ops;
92 static const struct lu_device_type_operations slp_device_type_ops;
93              //struct lu_device_type            slp_device_type;
94 static const struct cl_page_operations        slp_page_ops;
95 static const struct cl_page_operations        slp_transient_page_ops;
96 static const struct cl_lock_operations        slp_lock_ops;
97
98
99 /*****************************************************************************
100  *
101  * Slp device and device type functions.
102  *
103  */
104
105 void *slp_session_key_init(const struct lu_context *ctx,
106                                   struct lu_context_key *key)
107 {
108         struct slp_session *session;
109
110         OBD_ALLOC_PTR(session);
111         if (session == NULL)
112                 session = ERR_PTR(-ENOMEM);
113         return session;
114 }
115
116 void slp_session_key_fini(const struct lu_context *ctx,
117                                  struct lu_context_key *key, void *data)
118 {
119         struct slp_session *session = data;
120         OBD_FREE_PTR(session);
121 }
122
123 struct lu_context_key slp_session_key = {
124         .lct_tags = LCT_SESSION,
125         .lct_init = slp_session_key_init,
126         .lct_fini = slp_session_key_fini
127 };
128
129 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
130 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
131
132 static struct lu_device *slp_device_alloc(const struct lu_env *env,
133                                           struct lu_device_type *t,
134                                           struct lustre_cfg *cfg)
135 {
136         return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
137 }
138
139 static int slp_lock_init(const struct lu_env *env,
140                          struct cl_object *obj, struct cl_lock *lock,
141                          const struct cl_io *io)
142 {
143         return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
144 }
145
146 static const struct cl_object_operations slp_ops = {
147         .coo_page_init = slp_page_init,
148         .coo_lock_init = slp_lock_init,
149         .coo_io_init   = slp_io_init,
150         .coo_attr_get  = slp_attr_get,
151         .coo_attr_set  = ccc_attr_set,
152         .coo_conf_set  = ccc_conf_set,
153         .coo_glimpse   = ccc_object_glimpse
154 };
155
156 static int slp_object_print(const struct lu_env *env, void *cookie,
157                             lu_printer_t p, const struct lu_object *o)
158 {
159         struct ccc_object *obj   = lu2ccc(o);
160         struct inode      *inode = obj->cob_inode;
161         struct intnl_stat *st = NULL;
162
163         if (inode)
164                 st = llu_i2stat(inode);
165
166         return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
167                     obj, inode,
168                     st ? (unsigned long)st->st_ino : 0UL,
169                     inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
170                     : 0);
171 }
172
173 static const struct lu_object_operations slp_lu_obj_ops = {
174         .loo_object_init      = ccc_object_init,
175         .loo_object_start     = NULL,
176         .loo_object_delete    = NULL,
177         .loo_object_release   = NULL,
178         .loo_object_free      = ccc_object_free,
179         .loo_object_print     = slp_object_print,
180         .loo_object_invariant = NULL
181 };
182
183 static struct lu_object *slp_object_alloc(const struct lu_env *env,
184                                           const struct lu_object_header *hdr,
185                                           struct lu_device *dev)
186 {
187         return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
188 }
189
190 static const struct lu_device_operations slp_lu_ops = {
191         .ldo_object_alloc      = slp_object_alloc
192 };
193
194 static const struct cl_device_operations slp_cl_ops = {
195         .cdo_req_init = ccc_req_init
196 };
197
198 static const struct lu_device_type_operations slp_device_type_ops = {
199         .ldto_init = slp_type_init,
200         .ldto_fini = slp_type_fini,
201
202         .ldto_start = slp_type_start,
203         .ldto_stop  = slp_type_stop,
204
205         .ldto_device_alloc = slp_device_alloc,
206         .ldto_device_free  = ccc_device_free,
207         .ldto_device_init  = ccc_device_init,
208         .ldto_device_fini  = ccc_device_fini
209 };
210
211 struct lu_device_type slp_device_type = {
212         .ldt_tags     = LU_DEVICE_CL,
213         .ldt_name     = LUSTRE_SLP_NAME,
214         .ldt_ops      = &slp_device_type_ops,
215         .ldt_ctx_tags = LCT_CL_THREAD
216 };
217
218 int slp_global_init(void)
219 {
220         int result;
221
222         result = ccc_global_init(&slp_device_type);
223         return result;
224 }
225
226 void slp_global_fini(void)
227 {
228         ccc_global_fini(&slp_device_type);
229 }
230
231 /*****************************************************************************
232  *
233  * Object operations.
234  *
235  */
236
237 static struct cl_page *slp_page_init(const struct lu_env *env,
238                                      struct cl_object *obj,
239                                      struct cl_page *page, cfs_page_t *vmpage)
240 {
241         struct ccc_page *cpg;
242         int result;
243
244         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
245
246         OBD_ALLOC_PTR(cpg);
247         if (cpg != NULL) {
248                 cpg->cpg_page = vmpage;
249
250                 if (page->cp_type == CPT_CACHEABLE) {
251                         LBUG();
252                 } else {
253                         struct ccc_object *clobj = cl2ccc(obj);
254
255                         cl_page_slice_add(page, &cpg->cpg_cl, obj,
256                                           &slp_transient_page_ops);
257                         clobj->cob_transient_pages++;
258                 }
259                 result = 0;
260         } else
261                 result = -ENOMEM;
262         return ERR_PTR(result);
263 }
264
265 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
266                        struct cl_io *io)
267 {
268         struct ccc_io      *vio   = ccc_env_io(env);
269         int result = 0;
270
271         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
272
273         cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
274         if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
275                 size_t count;
276
277                 count = io->u.ci_rw.crw_count;
278                 /* "If nbyte is 0, read() will return 0 and have no other
279                  *  results."  -- Single Unix Spec */
280                 if (count == 0)
281                         result = 1;
282                 else {
283                         vio->cui_tot_count = count;
284                         vio->cui_tot_nrsegs = 0;
285                 }
286
287         }
288         return result;
289 }
290
291 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
292                         struct cl_attr *attr)
293 {
294         struct inode *inode = ccc_object_inode(obj);
295         struct intnl_stat *st = llu_i2stat(inode);
296
297         attr->cat_size = st->st_size;
298         attr->cat_blocks = st->st_blocks;
299         attr->cat_mtime  = st->st_mtime;
300         attr->cat_atime  = st->st_atime;
301         attr->cat_ctime  = st->st_ctime;
302         /* KMS is not known by this layer */
303         return 0; /* layers below have to fill in the rest */
304 }
305
306 /*****************************************************************************
307  *
308  * Page operations.
309  *
310  */
311
312 static void slp_page_fini_common(struct ccc_page *cp)
313 {
314         cfs_page_t *vmpage = cp->cpg_page;
315
316         LASSERT(vmpage != NULL);
317         llu_free_user_page(vmpage);
318         OBD_FREE_PTR(cp);
319 }
320
321 static void slp_page_completion_common(const struct lu_env *env,
322                                        struct ccc_page *cp, int ioret)
323 {
324         struct cl_sync_io *anchor = cp->cpg_sync_io;
325
326         if (anchor) {
327                 cp->cpg_sync_io  = NULL;
328                 cl_sync_io_note(anchor, ioret);
329         } else {
330                 LBUG();
331         }
332 }
333
334 static void slp_page_completion_read(const struct lu_env *env,
335                                      const struct cl_page_slice *slice,
336                                      int ioret)
337 {
338         struct ccc_page *cp      = cl2ccc_page(slice);
339         ENTRY;
340
341         slp_page_completion_common(env, cp, ioret);
342
343         EXIT;
344 }
345
346 static void slp_page_completion_write_common(const struct lu_env *env,
347                                              const struct cl_page_slice *slice,
348                                              int ioret)
349 {
350         struct ccc_page *cp     = cl2ccc_page(slice);
351
352         if (ioret == 0) {
353                 cp->cpg_write_queued = 0;
354                 /*
355                  * Only ioret == 0, write succeed, then this page could be
356                  * deleted from the pending_writing count.
357                  */
358         }
359         slp_page_completion_common(env, cp, ioret);
360 }
361
362 static int slp_page_is_vmlocked(const struct lu_env *env,
363                                 const struct cl_page_slice *slice)
364 {
365         return -EBUSY;
366 }
367
368 static void slp_transient_page_fini(const struct lu_env *env,
369                                     struct cl_page_slice *slice)
370 {
371         struct ccc_page *cp = cl2ccc_page(slice);
372         struct cl_page *clp = slice->cpl_page;
373         struct ccc_object *clobj = cl2ccc(clp->cp_obj);
374
375         slp_page_fini_common(cp);
376         clobj->cob_transient_pages--;
377 }
378
379
380 static const struct cl_page_operations slp_transient_page_ops = {
381         .cpo_own           = ccc_transient_page_own,
382         .cpo_assume        = ccc_transient_page_assume,
383         .cpo_unassume      = ccc_transient_page_unassume,
384         .cpo_disown        = ccc_transient_page_disown,
385         .cpo_discard       = ccc_transient_page_discard,
386         .cpo_vmpage        = ccc_page_vmpage,
387         .cpo_is_vmlocked   = slp_page_is_vmlocked,
388         .cpo_fini          = slp_transient_page_fini,
389         .cpo_is_under_lock = ccc_page_is_under_lock,
390         .io = {
391                 [CRT_READ] = {
392                         .cpo_completion  = slp_page_completion_read,
393                 },
394                 [CRT_WRITE] = {
395                         .cpo_completion  = slp_page_completion_write_common,
396                 }
397         }
398 };
399
400 /*****************************************************************************
401  *
402  * Lock operations.
403  *
404  */
405
406 static int slp_lock_enqueue(const struct lu_env *env,
407                            const struct cl_lock_slice *slice,
408                            struct cl_io *_, __u32 enqflags)
409 {
410         CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
411
412         liblustre_wait_event(0);
413         return 0;
414 }
415
416 static const struct cl_lock_operations slp_lock_ops = {
417         .clo_delete    = ccc_lock_delete,
418         .clo_fini      = ccc_lock_fini,
419         .clo_enqueue   = slp_lock_enqueue,
420         .clo_wait      = ccc_lock_wait,
421         .clo_unuse     = ccc_lock_unuse,
422         .clo_fits_into = ccc_lock_fits_into,
423 };
424
425 /*****************************************************************************
426  *
427  * io operations.
428  *
429  */
430
431 static int slp_io_rw_lock(const struct lu_env *env,
432                           const struct cl_io_slice *ios)
433 {
434         struct ccc_io *cio = ccc_env_io(env);
435         struct cl_io *io   = ios->cis_io;
436         loff_t start;
437         loff_t end;
438
439         if (cl_io_is_append(io)) {
440                 start = 0;
441                 end   = OBD_OBJECT_EOF;
442         } else {
443                 start = io->u.ci_wr.wr.crw_pos;
444                 end   = start + io->u.ci_wr.wr.crw_count - 1;
445         }
446
447         ccc_io_update_iov(env, cio, io);
448
449         /*
450          * This acquires real DLM lock only in O_APPEND case, because of
451          * the io->ci_lockreq setting in llu_io_init().
452          */
453         LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
454         LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
455         return ccc_io_one_lock(env, io, 0,
456                                io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
457                                start, end);
458
459 }
460
461 static int slp_io_trunc_iter_init(const struct lu_env *env,
462                                   const struct cl_io_slice *ios)
463 {
464         return 0;
465 }
466
467 static int slp_io_trunc_start(const struct lu_env *env,
468                               const struct cl_io_slice *ios)
469 {
470         return 0;
471 }
472
473 static struct page *llu_get_user_page(int index, void *addr, int offset,
474                                       int count)
475 {
476         struct page *page;
477
478         OBD_ALLOC_PTR(page);
479         if (!page)
480                 return NULL;
481         page->index = index;
482         page->addr = addr;
483         page->_offset = offset;
484         page->_count = count;
485
486         CFS_INIT_LIST_HEAD(&page->list);
487         CFS_INIT_LIST_HEAD(&page->_node);
488
489         return page;
490 }
491
492 static void llu_free_user_page(struct page *page)
493 {
494         OBD_FREE_PTR(page);
495 }
496
497 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
498                          struct llu_io_group *group,
499                          char *buf, size_t count, loff_t pos)
500 {
501         struct cl_object *obj = io->ci_obj;
502         struct inode *inode = ccc_object_inode(obj);
503         struct intnl_stat *st = llu_i2stat(inode);
504         struct obd_export *exp = llu_i2obdexp(inode);
505         struct page *page;
506         int  rc = 0, npages = 0, ret_bytes = 0;
507         int local_lock;
508         struct cl_page *clp;
509         struct ccc_page *clup;
510         struct cl_2queue *queue;
511         struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
512         ENTRY;
513
514         if (!exp)
515                 RETURN(-EINVAL);
516
517         local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
518
519         queue = &io->ci_queue;
520         cl_2queue_init(queue);
521
522
523         /* prepare the pages array */
524         do {
525                 unsigned long index, offset, bytes;
526
527                 offset = (pos & ~CFS_PAGE_MASK);
528                 index = pos >> CFS_PAGE_SHIFT;
529                 bytes = CFS_PAGE_SIZE - offset;
530                 if (bytes > count)
531                         bytes = count;
532
533                 /* prevent read beyond file range */
534                 if (/* local_lock && */
535                     io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
536                         if (pos >= st->st_size)
537                                 break;
538                         bytes = st->st_size - pos;
539                 }
540
541                 /* prepare page for this index */
542                 page = llu_get_user_page(index, buf - offset, offset, bytes);
543                 if (!page) {
544                         rc = -ENOMEM;
545                         break;
546                 }
547
548                 clp = cl_page_find(env, obj,
549                                    cl_index(obj, pos),
550                                    page, CPT_TRANSIENT);
551
552                 if (IS_ERR(clp)) {
553                         rc = PTR_ERR(clp);
554                         break;
555                 }
556
557                 rc = cl_page_own(env, io, clp);
558                 if (rc) {
559                         LASSERT(clp->cp_state == CPS_FREEING);
560                         cl_page_put(env, clp);
561                         break;
562                 }
563
564                 clup = cl2ccc_page(cl_page_at(clp, &slp_device_type));
565                 clup->cpg_sync_io = anchor;
566                 cl_2queue_add(queue, clp);
567
568                 /* drop the reference count for cl_page_find, so that the page
569                  * will be freed in cl_2queue_fini. */
570                 cl_page_put(env, clp);
571
572                 cl_page_clip(env, clp, offset, offset+bytes);
573
574                 npages++;
575                 count -= bytes;
576                 pos += bytes;
577                 buf += bytes;
578
579                 group->lig_rwcount += bytes;
580                 ret_bytes += bytes;
581                 page++;
582         } while (count);
583
584         cl_sync_io_init(anchor, npages);
585         /* printk("Inited anchor with %d pages\n", npages); */
586
587         if (rc == 0) {
588                 enum cl_req_type crt;
589
590                 crt = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
591                 rc = cl_io_submit_rw(env, io, crt, queue, CRP_NORMAL);
592                 if (rc == 0) {
593                         /* If some pages weren't sent for any reason, count
594                          * then as completed, to avoid infinite wait. */
595                         cl_page_list_for_each(clp, &queue->c2_qin) {
596                                 CL_PAGE_DEBUG(D_ERROR, env, clp,
597                                               "not completed\n");
598                                 cl_sync_io_note(anchor, +1);
599                         }
600                         /* wait for the IO to be finished. */
601                         rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
602                 }
603         }
604
605         group->lig_rc = rc;
606
607         cl_2queue_discard(env, io, queue);
608         cl_2queue_disown(env, io, queue);
609         cl_2queue_fini(env, queue);
610
611         RETURN(ret_bytes);
612 }
613
614 static
615 struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
616                                    struct lustre_rw_params *params)
617 {
618         struct llu_io_group *group;
619
620         OBD_ALLOC_PTR(group);
621         if (!group)
622                 return ERR_PTR(-ENOMEM);
623
624         group->lig_params = params;
625
626         return group;
627 }
628
629 static int max_io_pages(ssize_t len, int iovlen)
630 {
631         return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
632 }
633
634 void put_io_group(struct llu_io_group *group)
635 {
636         OBD_FREE_PTR(group);
637 }
638
639 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
640 {
641         struct ccc_io     *cio   = cl2ccc_io(env, ios);
642         struct cl_io      *io    = ios->cis_io;
643         struct cl_object  *obj   = io->ci_obj;
644         struct inode      *inode = ccc_object_inode(obj);
645         int     err, ret;
646         loff_t  pos;
647         size_t  cnt;
648         struct llu_io_group *iogroup;
649         struct lustre_rw_params p = {0};
650         int iovidx;
651         struct intnl_stat *st = llu_i2stat(inode);
652         struct llu_inode_info *lli = llu_i2info(inode);
653         struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
654         int write = io->ci_type == CIT_WRITE;
655         int exceed = 0;
656
657         CLOBINVRNT(env, obj, ccc_object_invariant(obj));
658
659         if (write) {
660                 pos = io->u.ci_wr.wr.crw_pos;
661                 cnt = io->u.ci_wr.wr.crw_count;
662         } else {
663                 pos = io->u.ci_rd.rd.crw_pos;
664                 cnt = io->u.ci_rd.rd.crw_count;
665         }
666         if (io->u.ci_wr.wr_append) {
667                 p.lrp_lock_mode = LCK_PW;
668         } else {
669                 p.lrp_brw_flags = OBD_BRW_SRVLOCK;
670                 p.lrp_lock_mode = LCK_NL;
671         }
672
673         iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs), &p);
674         if (IS_ERR(iogroup))
675                 RETURN(PTR_ERR(iogroup));
676
677         err = ccc_prep_size(env, obj, io, pos, cnt, 0, &exceed);
678         if (err != 0 || (write == 0 && exceed != 0))
679                 GOTO(out, err);
680
681         CDEBUG(D_INODE,
682                "%s ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
683                write?"Write":"Read", (unsigned long)st->st_ino,
684                cnt, (__u64)pos, (__u64)st->st_size);
685
686         if (write && io->u.ci_wr.wr_append)
687                 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
688                 /* XXX What about if one write syscall writes at 2 different offsets? */
689
690         for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
691                 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
692                 size_t count = cio->cui_iov[iovidx].iov_len;
693
694                 if (!count)
695                         continue;
696                 if (cnt < count)
697                         count = cnt;
698                 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
699                         GOTO(out, err = -EFAULT);
700                 }
701
702                 if (io->ci_type == CIT_READ) {
703                         if (/* local_lock && */ pos >= st->st_size)
704                                 break;
705                 } else if (io->ci_type == CIT_WRITE) {
706                         if (pos >= lli->lli_maxbytes) {
707                                 GOTO(out, err = -EFBIG);
708                         }
709                         if (pos + count >= lli->lli_maxbytes)
710                                 count = lli->lli_maxbytes - pos;
711                 } else {
712                         LBUG();
713                 }
714
715                 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
716                 if (ret < 0) {
717                         GOTO(out, err = ret);
718                 } else {
719                         io->ci_nob += ret;
720                         pos += ret;
721                         cnt -= ret;
722                         if (io->ci_type == CIT_WRITE) {
723 //                                obd_adjust_kms(exp, lsm, pos, 0); // XXX
724                                 if (pos > st->st_size)
725                                         st->st_size = pos;
726                         }
727                         if (!cnt)
728                                 break;
729                 }
730         }
731         LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
732
733         if (!iogroup->lig_rc)
734                 session->lis_rwcount += iogroup->lig_rwcount;
735         else if (!session->lis_rc)
736                 session->lis_rc = iogroup->lig_rc;
737         err = 0;
738
739 out:
740         put_io_group(iogroup);
741         return err;
742 }
743
744 static const struct cl_io_operations ccc_io_ops = {
745         .op = {
746                 [CIT_READ] = {
747                         .cio_fini      = ccc_io_fini,
748                         .cio_lock      = slp_io_rw_lock,
749                         .cio_start     = slp_io_start,
750                         .cio_end       = ccc_io_end,
751                         .cio_advance   = ccc_io_advance
752                 },
753                 [CIT_WRITE] = {
754                         .cio_fini      = ccc_io_fini,
755                         .cio_lock      = slp_io_rw_lock,
756                         .cio_start     = slp_io_start,
757                         .cio_end       = ccc_io_end,
758                         .cio_advance   = ccc_io_advance
759                 },
760                 [CIT_TRUNC] = {
761                         .cio_fini       = ccc_io_fini,
762                         .cio_iter_init  = slp_io_trunc_iter_init,
763                         .cio_start      = slp_io_trunc_start
764                 },
765                 [CIT_MISC] = {
766                         .cio_fini   = ccc_io_fini
767                 }
768         }
769 };
770
771 static struct slp_io *cl2slp_io(const struct lu_env *env,
772                                 const struct cl_io_slice *slice)
773 {
774         /* We call it just for assertion here */
775         cl2ccc_io(env, slice);
776
777         return slp_env_io(env);
778 }
779
780 /*****************************************************************************
781  *
782  * Temporary prototype thing: mirror obd-devices into cl devices.
783  *
784  */
785
786 int cl_sb_init(struct llu_sb_info *sbi)
787 {
788         struct cl_device  *cl;
789         struct lu_env     *env;
790         int rc = 0;
791         int refcheck;
792
793         env = cl_env_get(&refcheck);
794         if (IS_ERR(env))
795                 RETURN(PTR_ERR(env));
796
797         cl = cl_type_setup(env, NULL, &slp_device_type,
798                            sbi->ll_dt_exp->exp_obd->obd_lu_dev);
799         if (IS_ERR(cl))
800                 GOTO(out, rc = PTR_ERR(cl));
801
802         sbi->ll_cl = cl;
803         sbi->ll_site = cl2lu_dev(cl)->ld_site;
804 out:
805         cl_env_put(env, &refcheck);
806         RETURN(rc);
807 }
808
809 int cl_sb_fini(struct llu_sb_info *sbi)
810 {
811         struct lu_env *env;
812         int refcheck;
813
814         ENTRY;
815
816         env = cl_env_get(&refcheck);
817         if (IS_ERR(env))
818                 RETURN(PTR_ERR(env));
819
820         if (sbi->ll_cl != NULL) {
821                 cl_stack_fini(env, sbi->ll_cl);
822                 sbi->ll_cl = NULL;
823                 sbi->ll_site = NULL;
824         }
825         cl_env_put(env, &refcheck);
826         /*
827          * If mount failed (sbi->ll_cl == NULL), and this there are no other
828          * mounts, stop device types manually (this usually happens
829          * automatically when last device is destroyed).
830          */
831         lu_types_stop();
832         cl_env_cache_purge(~0);
833         RETURN(0);
834 }