2 * Copyright (c) 2007 Cluster File Systems, Inc.
3 * Author: Nikita Danilov <nikita@clusterfs.com>
5 * This file is part of Lustre, http://www.lustre.org.
7 * Lustre is free software; you can redistribute it and/or
8 * modify it under the terms of version 2 of the GNU General Public
9 * License as published by the Free Software Foundation.
11 * Lustre is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with Lustre; if not, write to the Free Software
18 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 * Copyright (c) 2011, 2013, Intel Corporation.
23 #define DEBUG_SUBSYSTEM S_LLITE
29 #include <sys/types.h>
31 #include <libcfs/libcfs.h>
32 #include <lustre/lustre_idl.h>
33 #include <liblustre.h>
35 #include <cl_object.h>
36 #include <lustre_export.h>
37 #include <lustre_lite.h>
39 #include <obd_support.h>
40 #include "llite_lib.h"
43 * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
47 static int slp_type_init (struct lu_device_type *t);
48 static void slp_type_fini (struct lu_device_type *t);
50 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
51 struct cl_page *page, pgoff_t index);
52 static int slp_attr_get (const struct lu_env *env, struct cl_object *obj,
53 struct cl_attr *attr);
55 static struct lu_device *slp_device_alloc(const struct lu_env *env,
56 struct lu_device_type *t,
57 struct lustre_cfg *cfg);
59 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
61 static struct slp_io *cl2slp_io(const struct lu_env *env,
62 const struct cl_io_slice *slice);
65 static void llu_free_user_page(struct page *page);
67 static const struct lu_object_operations slp_lu_obj_ops;
68 static const struct lu_device_operations slp_lu_ops;
69 static const struct cl_device_operations slp_cl_ops;
70 static const struct cl_io_operations ccc_io_ops;
71 static const struct lu_device_type_operations slp_device_type_ops;
72 //struct lu_device_type slp_device_type;
73 static const struct cl_page_operations slp_transient_page_ops;
74 static const struct cl_lock_operations slp_lock_ops;
77 /*****************************************************************************
79 * Slp device and device type functions.
83 static void *slp_session_key_init(const struct lu_context *ctx,
84 struct lu_context_key *key)
86 struct slp_session *session;
88 OBD_ALLOC_PTR(session);
90 session = ERR_PTR(-ENOMEM);
94 static void slp_session_key_fini(const struct lu_context *ctx,
95 struct lu_context_key *key, void *data)
97 struct slp_session *session = data;
98 OBD_FREE_PTR(session);
101 struct lu_context_key slp_session_key = {
102 .lct_tags = LCT_SESSION,
103 .lct_init = slp_session_key_init,
104 .lct_fini = slp_session_key_fini
107 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
108 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
110 static struct lu_device *slp_device_alloc(const struct lu_env *env,
111 struct lu_device_type *t,
112 struct lustre_cfg *cfg)
114 return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
117 static int slp_lock_init(const struct lu_env *env,
118 struct cl_object *obj, struct cl_lock *lock,
119 const struct cl_io *io)
121 return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
124 static const struct cl_object_operations slp_ops = {
125 .coo_page_init = slp_page_init,
126 .coo_lock_init = slp_lock_init,
127 .coo_io_init = slp_io_init,
128 .coo_attr_get = slp_attr_get,
129 .coo_attr_set = ccc_attr_set,
130 .coo_conf_set = ccc_conf_set,
131 .coo_glimpse = ccc_object_glimpse
134 static int slp_object_print(const struct lu_env *env, void *cookie,
135 lu_printer_t p, const struct lu_object *o)
137 struct ccc_object *obj = lu2ccc(o);
138 struct inode *inode = obj->cob_inode;
139 struct intnl_stat *st = NULL;
142 st = llu_i2stat(inode);
144 return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
146 st ? (unsigned long)st->st_ino : 0UL,
147 inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
151 static const struct lu_object_operations slp_lu_obj_ops = {
152 .loo_object_init = ccc_object_init,
153 .loo_object_start = NULL,
154 .loo_object_delete = NULL,
155 .loo_object_release = NULL,
156 .loo_object_free = ccc_object_free,
157 .loo_object_print = slp_object_print,
158 .loo_object_invariant = NULL
161 static struct lu_object *slp_object_alloc(const struct lu_env *env,
162 const struct lu_object_header *hdr,
163 struct lu_device *dev)
165 return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
168 static const struct lu_device_operations slp_lu_ops = {
169 .ldo_object_alloc = slp_object_alloc
172 static const struct cl_device_operations slp_cl_ops = {
173 .cdo_req_init = ccc_req_init
176 static const struct lu_device_type_operations slp_device_type_ops = {
177 .ldto_init = slp_type_init,
178 .ldto_fini = slp_type_fini,
180 .ldto_start = slp_type_start,
181 .ldto_stop = slp_type_stop,
183 .ldto_device_alloc = slp_device_alloc,
184 .ldto_device_free = ccc_device_free,
185 .ldto_device_init = ccc_device_init,
186 .ldto_device_fini = ccc_device_fini
189 static struct lu_device_type slp_device_type = {
190 .ldt_tags = LU_DEVICE_CL,
191 .ldt_name = LUSTRE_SLP_NAME,
192 .ldt_ops = &slp_device_type_ops,
193 .ldt_ctx_tags = LCT_CL_THREAD
196 int slp_global_init(void)
200 result = ccc_global_init(&slp_device_type);
204 void slp_global_fini(void)
206 ccc_global_fini(&slp_device_type);
209 /*****************************************************************************
215 static int slp_page_init(const struct lu_env *env, struct cl_object *obj,
216 struct cl_page *page, pgoff_t index)
218 struct ccc_page *cpg = cl_object_page_slice(obj, page);
220 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
222 cpg->cpg_page = page->cp_vmpage;
224 if (page->cp_type == CPT_CACHEABLE) {
227 struct ccc_object *clobj = cl2ccc(obj);
229 cl_page_slice_add(page, &cpg->cpg_cl, obj, index,
230 &slp_transient_page_ops);
231 clobj->cob_transient_pages++;
237 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
240 struct ccc_io *vio = ccc_env_io(env);
243 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
245 cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
246 if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
249 count = io->u.ci_rw.crw_count;
250 /* "If nbyte is 0, read() will return 0 and have no other
251 * results." -- Single Unix Spec */
255 vio->cui_tot_count = count;
256 vio->cui_tot_nrsegs = 0;
263 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
264 struct cl_attr *attr)
266 struct inode *inode = ccc_object_inode(obj);
267 struct intnl_stat *st = llu_i2stat(inode);
269 attr->cat_size = st->st_size;
270 attr->cat_blocks = st->st_blocks;
271 attr->cat_mtime = st->st_mtime;
272 attr->cat_atime = st->st_atime;
273 attr->cat_ctime = st->st_ctime;
274 /* KMS is not known by this layer */
275 return 0; /* layers below have to fill in the rest */
278 /*****************************************************************************
284 static void slp_page_fini_common(struct ccc_page *cp)
286 struct page *vmpage = cp->cpg_page;
288 LASSERT(vmpage != NULL);
289 llu_free_user_page(vmpage);
293 static void slp_page_completion_common(const struct lu_env *env,
294 struct ccc_page *cp, int ioret)
296 LASSERT(cp->cpg_cl.cpl_page->cp_sync_io != NULL);
299 static void slp_page_completion_read(const struct lu_env *env,
300 const struct cl_page_slice *slice,
303 struct ccc_page *cp = cl2ccc_page(slice);
306 slp_page_completion_common(env, cp, ioret);
311 static void slp_page_completion_write_common(const struct lu_env *env,
312 const struct cl_page_slice *slice,
315 struct ccc_page *cp = cl2ccc_page(slice);
318 cp->cpg_write_queued = 0;
320 * Only ioret == 0, write succeed, then this page could be
321 * deleted from the pending_writing count.
324 slp_page_completion_common(env, cp, ioret);
327 static int slp_page_is_vmlocked(const struct lu_env *env,
328 const struct cl_page_slice *slice)
333 static void slp_transient_page_fini(const struct lu_env *env,
334 struct cl_page_slice *slice)
336 struct ccc_page *cp = cl2ccc_page(slice);
337 struct cl_page *clp = slice->cpl_page;
338 struct ccc_object *clobj = cl2ccc(clp->cp_obj);
340 slp_page_fini_common(cp);
341 clobj->cob_transient_pages--;
345 static const struct cl_page_operations slp_transient_page_ops = {
346 .cpo_own = ccc_transient_page_own,
347 .cpo_assume = ccc_transient_page_assume,
348 .cpo_unassume = ccc_transient_page_unassume,
349 .cpo_disown = ccc_transient_page_disown,
350 .cpo_discard = ccc_transient_page_discard,
351 .cpo_is_vmlocked = slp_page_is_vmlocked,
352 .cpo_fini = slp_transient_page_fini,
355 .cpo_completion = slp_page_completion_read,
358 .cpo_completion = slp_page_completion_write_common,
363 /*****************************************************************************
369 static int slp_lock_enqueue(const struct lu_env *env,
370 const struct cl_lock_slice *slice,
371 struct cl_io *unused, __u32 enqflags)
373 CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
375 liblustre_wait_event(0);
379 static const struct cl_lock_operations slp_lock_ops = {
380 .clo_delete = ccc_lock_delete,
381 .clo_fini = ccc_lock_fini,
382 .clo_enqueue = slp_lock_enqueue,
383 .clo_wait = ccc_lock_wait,
384 .clo_unuse = ccc_lock_unuse,
385 .clo_fits_into = ccc_lock_fits_into,
388 /*****************************************************************************
394 static int slp_io_rw_lock(const struct lu_env *env,
395 const struct cl_io_slice *ios)
397 struct ccc_io *cio = ccc_env_io(env);
398 struct cl_io *io = ios->cis_io;
402 if (cl_io_is_append(io)) {
404 end = OBD_OBJECT_EOF;
406 start = io->u.ci_wr.wr.crw_pos;
407 end = start + io->u.ci_wr.wr.crw_count - 1;
410 ccc_io_update_iov(env, cio, io);
413 * This acquires real DLM lock only in O_APPEND case, because of
414 * the io->ci_lockreq setting in llu_io_init().
416 LASSERT(ergo(cl_io_is_append(io), io->ci_lockreq == CILR_MANDATORY));
417 LASSERT(ergo(!cl_io_is_append(io), io->ci_lockreq == CILR_NEVER));
418 return ccc_io_one_lock(env, io, 0,
419 io->ci_type == CIT_READ ? CLM_READ : CLM_WRITE,
424 static int slp_io_setattr_iter_init(const struct lu_env *env,
425 const struct cl_io_slice *ios)
430 static int slp_io_setattr_start(const struct lu_env *env,
431 const struct cl_io_slice *ios)
436 static struct page *llu_get_user_page(int index, void *addr, int offset,
446 page->_offset = offset;
447 page->_count = count;
449 CFS_INIT_LIST_HEAD(&page->list);
450 CFS_INIT_LIST_HEAD(&page->_node);
455 static void llu_free_user_page(struct page *page)
461 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
462 struct llu_io_group *group,
463 char *buf, size_t count, loff_t pos)
465 struct cl_object *obj = io->ci_obj;
466 struct inode *inode = ccc_object_inode(obj);
467 struct intnl_stat *st = llu_i2stat(inode);
468 struct obd_export *exp = llu_i2obdexp(inode);
470 int rc = 0, ret_bytes = 0;
472 struct cl_2queue *queue;
478 queue = &io->ci_queue;
479 cl_2queue_init(queue);
482 /* prepare the pages array */
484 unsigned long index, offset, bytes;
486 offset = (pos & ~CFS_PAGE_MASK);
487 index = pos >> PAGE_CACHE_SHIFT;
488 bytes = PAGE_CACHE_SIZE - offset;
492 /* prevent read beyond file range */
493 if (/* local_lock && */
494 io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
495 if (pos >= st->st_size)
497 bytes = st->st_size - pos;
500 /* prepare page for this index */
501 page = llu_get_user_page(index, buf - offset, offset, bytes);
507 clp = cl_page_find(env, obj,
509 page, CPT_TRANSIENT);
516 rc = cl_page_own(env, io, clp);
518 LASSERT(clp->cp_state == CPS_FREEING);
519 cl_page_put(env, clp);
523 cl_2queue_add(queue, clp);
525 /* drop the reference count for cl_page_find, so that the page
526 * will be freed in cl_2queue_fini. */
527 cl_page_put(env, clp);
529 cl_page_clip(env, clp, offset, offset+bytes);
535 group->lig_rwcount += bytes;
541 enum cl_req_type iot;
542 iot = io->ci_type == CIT_READ ? CRT_READ : CRT_WRITE;
543 rc = cl_io_submit_sync(env, io, iot, queue, 0);
548 cl_2queue_discard(env, io, queue);
549 cl_2queue_disown(env, io, queue);
550 cl_2queue_fini(env, queue);
556 struct llu_io_group *get_io_group(struct inode *inode, int maxpages)
558 struct llu_io_group *group;
560 OBD_ALLOC_PTR(group);
562 return ERR_PTR(-ENOMEM);
567 static int max_io_pages(ssize_t len, int iovlen)
569 return ((len + PAGE_CACHE_SIZE - 1) / PAGE_CACHE_SIZE) +
573 void put_io_group(struct llu_io_group *group)
579 * True, if \a io is a normal io, False for sendfile() / splice_{read|write}
581 int cl_is_normalio(const struct lu_env *env, const struct cl_io *io)
586 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
588 struct ccc_io *cio = cl2ccc_io(env, ios);
589 struct cl_io *io = ios->cis_io;
590 struct cl_object *obj = io->ci_obj;
591 struct inode *inode = ccc_object_inode(obj);
595 struct llu_io_group *iogroup;
597 struct intnl_stat *st = llu_i2stat(inode);
598 struct llu_inode_info *lli = llu_i2info(inode);
599 struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
600 int write = io->ci_type == CIT_WRITE;
603 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
606 pos = io->u.ci_wr.wr.crw_pos;
607 cnt = io->u.ci_wr.wr.crw_count;
609 pos = io->u.ci_rd.rd.crw_pos;
610 cnt = io->u.ci_rd.rd.crw_count;
613 iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs));
615 RETURN(PTR_ERR(iogroup));
617 err = ccc_prep_size(env, obj, io, pos, cnt, &exceed);
618 if (err != 0 || (write == 0 && exceed != 0))
622 "%s ino %lu, %lu bytes, offset "LPU64", i_size "LPU64"\n",
623 write ? "Write" : "Read", (unsigned long)st->st_ino,
624 cnt, (__u64)pos, (__u64)st->st_size);
626 if (write && io->u.ci_wr.wr_append)
627 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
628 /* XXX What about if one write syscall writes at 2 different offsets? */
630 for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
631 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
632 long count = cio->cui_iov[iovidx].iov_len;
638 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
639 GOTO(out, err = -EFAULT);
642 if (io->ci_type == CIT_READ) {
643 if (/* local_lock && */ pos >= st->st_size)
645 } else if (io->ci_type == CIT_WRITE) {
646 if (pos >= lli->lli_maxbytes) {
647 GOTO(out, err = -EFBIG);
649 if (pos + count >= lli->lli_maxbytes)
650 count = lli->lli_maxbytes - pos;
655 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
657 GOTO(out, err = ret);
662 if (io->ci_type == CIT_WRITE) {
663 // obd_adjust_kms(exp, lsm, pos, 0); // XXX
664 if (pos > st->st_size)
671 LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
673 if (!iogroup->lig_rc)
674 session->lis_rwcount += iogroup->lig_rwcount;
675 else if (!session->lis_rc)
676 session->lis_rc = iogroup->lig_rc;
680 put_io_group(iogroup);
684 static const struct cl_io_operations ccc_io_ops = {
687 .cio_fini = ccc_io_fini,
688 .cio_lock = slp_io_rw_lock,
689 .cio_start = slp_io_start,
690 .cio_end = ccc_io_end,
691 .cio_advance = ccc_io_advance
694 .cio_fini = ccc_io_fini,
695 .cio_lock = slp_io_rw_lock,
696 .cio_start = slp_io_start,
697 .cio_end = ccc_io_end,
698 .cio_advance = ccc_io_advance
701 .cio_fini = ccc_io_fini,
702 .cio_iter_init = slp_io_setattr_iter_init,
703 .cio_start = slp_io_setattr_start
706 .cio_fini = ccc_io_fini
711 static struct slp_io *cl2slp_io(const struct lu_env *env,
712 const struct cl_io_slice *slice)
714 /* We call it just for assertion here */
715 cl2ccc_io(env, slice);
717 return slp_env_io(env);
720 /*****************************************************************************
722 * Temporary prototype thing: mirror obd-devices into cl devices.
726 int cl_sb_init(struct llu_sb_info *sbi)
728 struct cl_device *cl;
733 env = cl_env_get(&refcheck);
735 RETURN(PTR_ERR(env));
737 cl = cl_type_setup(env, NULL, &slp_device_type,
738 sbi->ll_dt_exp->exp_obd->obd_lu_dev);
740 GOTO(out, rc = PTR_ERR(cl));
743 sbi->ll_site = cl2lu_dev(cl)->ld_site;
745 cl_env_put(env, &refcheck);
749 int cl_sb_fini(struct llu_sb_info *sbi)
756 env = cl_env_get(&refcheck);
758 RETURN(PTR_ERR(env));
760 if (sbi->ll_cl != NULL) {
761 cl_stack_fini(env, sbi->ll_cl);
765 cl_env_put(env, &refcheck);
766 cl_env_cache_purge(~0);