1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2007 Cluster File Systems, Inc.
5 * Author: Nikita Danilov <nikita@clusterfs.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #define DEBUG_SUBSYSTEM S_LLITE
29 #include <sys/types.h>
32 #include <sys/queue.h>
34 # include <sys/statvfs.h>
36 # include <sys/statfs.h>
49 #include <liblustre.h>
52 #include <obd_support.h>
53 #include <lustre_fid.h>
54 #include <lustre_lite.h>
55 #include <lustre_dlm.h>
56 #include <lustre_ver.h>
57 #include <lustre_mdc.h>
58 #include <cl_object.h>
60 #include "llite_lib.h"
63 * slp_ prefix stands for "Sysio Library Posix". It corresponds to historical
67 static int slp_type_init (struct lu_device_type *t);
68 static void slp_type_fini (struct lu_device_type *t);
70 static struct cl_page * slp_page_init(const struct lu_env *env,
71 struct cl_object *obj,
72 struct cl_page *page, cfs_page_t *vmpage);
73 static int slp_attr_get (const struct lu_env *env, struct cl_object *obj,
74 struct cl_attr *attr);
76 static struct lu_device *slp_device_alloc(const struct lu_env *env,
77 struct lu_device_type *t,
78 struct lustre_cfg *cfg);
80 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
82 static struct slp_io *cl2slp_io(const struct lu_env *env,
83 const struct cl_io_slice *slice);
86 static void llu_free_user_page(struct page *page);
88 static const struct lu_object_operations slp_lu_obj_ops;
89 static const struct lu_device_operations slp_lu_ops;
90 static const struct cl_device_operations slp_cl_ops;
91 static const struct cl_io_operations ccc_io_ops;
92 static const struct lu_device_type_operations slp_device_type_ops;
93 //struct lu_device_type slp_device_type;
94 static const struct cl_page_operations slp_page_ops;
95 static const struct cl_page_operations slp_transient_page_ops;
96 static const struct cl_lock_operations slp_lock_ops;
99 /*****************************************************************************
101 * Slp device and device type functions.
105 void *slp_session_key_init(const struct lu_context *ctx,
106 struct lu_context_key *key)
108 struct slp_session *session;
110 OBD_ALLOC_PTR(session);
112 session = ERR_PTR(-ENOMEM);
116 void slp_session_key_fini(const struct lu_context *ctx,
117 struct lu_context_key *key, void *data)
119 struct slp_session *session = data;
120 OBD_FREE_PTR(session);
123 struct lu_context_key slp_session_key = {
124 .lct_tags = LCT_SESSION,
125 .lct_init = slp_session_key_init,
126 .lct_fini = slp_session_key_fini
129 /* type constructor/destructor: slp_type_{init,fini,start,stop}(). */
130 LU_TYPE_INIT_FINI(slp, &ccc_key, &ccc_session_key, &slp_session_key);
132 static struct lu_device *slp_device_alloc(const struct lu_env *env,
133 struct lu_device_type *t,
134 struct lustre_cfg *cfg)
136 return ccc_device_alloc(env, t, cfg, &slp_lu_ops, &slp_cl_ops);
139 static int slp_lock_init(const struct lu_env *env,
140 struct cl_object *obj, struct cl_lock *lock,
141 const struct cl_io *io)
143 return ccc_lock_init(env, obj, lock, io, &slp_lock_ops);
146 static const struct cl_object_operations slp_ops = {
147 .coo_page_init = slp_page_init,
148 .coo_lock_init = slp_lock_init,
149 .coo_io_init = slp_io_init,
150 .coo_attr_get = slp_attr_get,
151 .coo_attr_set = ccc_attr_set,
152 .coo_conf_set = ccc_conf_set,
153 .coo_glimpse = ccc_object_glimpse
156 static int slp_object_print(const struct lu_env *env, void *cookie,
157 lu_printer_t p, const struct lu_object *o)
159 struct ccc_object *obj = lu2ccc(o);
160 struct inode *inode = obj->cob_inode;
161 struct intnl_stat *st = NULL;
164 st = llu_i2stat(inode);
166 return (*p)(env, cookie, LUSTRE_SLP_NAME"-object@%p(%p:%lu/%u)",
168 st ? (unsigned long)st->st_ino : 0UL,
169 inode ? (unsigned int)llu_i2info(inode)->lli_st_generation
173 static const struct lu_object_operations slp_lu_obj_ops = {
174 .loo_object_init = ccc_object_init,
175 .loo_object_start = NULL,
176 .loo_object_delete = NULL,
177 .loo_object_release = NULL,
178 .loo_object_free = ccc_object_free,
179 .loo_object_print = slp_object_print,
180 .loo_object_invariant = NULL
183 static struct lu_object *slp_object_alloc(const struct lu_env *env,
184 const struct lu_object_header *hdr,
185 struct lu_device *dev)
187 return ccc_object_alloc(env, hdr, dev, &slp_ops, &slp_lu_obj_ops);
190 static const struct lu_device_operations slp_lu_ops = {
191 .ldo_object_alloc = slp_object_alloc
194 static const struct cl_device_operations slp_cl_ops = {
195 .cdo_req_init = ccc_req_init
198 static const struct lu_device_type_operations slp_device_type_ops = {
199 .ldto_init = slp_type_init,
200 .ldto_fini = slp_type_fini,
202 .ldto_start = slp_type_start,
203 .ldto_stop = slp_type_stop,
205 .ldto_device_alloc = slp_device_alloc,
206 .ldto_device_free = ccc_device_free,
207 .ldto_device_init = ccc_device_init,
208 .ldto_device_fini = ccc_device_fini
211 struct lu_device_type slp_device_type = {
212 .ldt_tags = LU_DEVICE_CL,
213 .ldt_name = LUSTRE_SLP_NAME,
214 .ldt_ops = &slp_device_type_ops,
215 .ldt_ctx_tags = LCT_CL_THREAD
218 int slp_global_init(void)
222 result = ccc_global_init(&slp_device_type);
226 void slp_global_fini(void)
228 ccc_global_fini(&slp_device_type);
231 /*****************************************************************************
237 static struct cl_page *slp_page_init(const struct lu_env *env,
238 struct cl_object *obj,
239 struct cl_page *page, cfs_page_t *vmpage)
241 struct ccc_page *cpg;
244 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
248 cpg->cpg_page = vmpage;
250 if (page->cp_type == CPT_CACHEABLE) {
253 struct ccc_object *clobj = cl2ccc(obj);
255 cl_page_slice_add(page, &cpg->cpg_cl, obj,
256 &slp_transient_page_ops);
257 clobj->cob_transient_pages++;
262 return ERR_PTR(result);
265 static int slp_io_init(const struct lu_env *env, struct cl_object *obj,
268 struct ccc_io *vio = ccc_env_io(env);
271 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
273 cl_io_slice_add(io, &vio->cui_cl, obj, &ccc_io_ops);
274 if (io->ci_type == CIT_READ || io->ci_type == CIT_WRITE) {
277 count = io->u.ci_rw.crw_count;
278 /* "If nbyte is 0, read() will return 0 and have no other
279 * results." -- Single Unix Spec */
282 /* "If nbyte is 0, read() will return 0 and have no other
283 * results." -- Single Unix Spec */
287 vio->cui_tot_count = count;
288 vio->cui_tot_nrsegs = 0;
295 static int slp_attr_get(const struct lu_env *env, struct cl_object *obj,
296 struct cl_attr *attr)
298 struct inode *inode = ccc_object_inode(obj);
299 struct intnl_stat *st = llu_i2stat(inode);
301 attr->cat_size = st->st_size;
302 attr->cat_blocks = st->st_blocks;
303 attr->cat_mtime = st->st_mtime;
304 attr->cat_atime = st->st_atime;
305 attr->cat_ctime = st->st_ctime;
306 /* KMS is not known by this layer */
307 return 0; /* layers below have to fill in the rest */
310 /*****************************************************************************
316 static void slp_page_fini_common(struct ccc_page *cp)
318 cfs_page_t *vmpage = cp->cpg_page;
320 LASSERT(vmpage != NULL);
321 llu_free_user_page(vmpage);
325 static void slp_page_completion_common(const struct lu_env *env,
326 struct ccc_page *cp, int ioret)
328 struct cl_sync_io *anchor = cp->cpg_sync_io;
331 cp->cpg_sync_io = NULL;
332 cl_sync_io_note(anchor, ioret);
338 static void slp_page_completion_read(const struct lu_env *env,
339 const struct cl_page_slice *slice,
342 struct ccc_page *cp = cl2ccc_page(slice);
345 slp_page_completion_common(env, cp, ioret);
350 static void slp_page_completion_write_common(const struct lu_env *env,
351 const struct cl_page_slice *slice,
354 struct ccc_page *cp = cl2ccc_page(slice);
357 cp->cpg_write_queued = 0;
359 * Only ioret == 0, write succeed, then this page could be
360 * deleted from the pending_writing count.
363 slp_page_completion_common(env, cp, ioret);
366 static int slp_page_is_vmlocked(const struct lu_env *env,
367 const struct cl_page_slice *slice)
372 static void slp_transient_page_fini(const struct lu_env *env,
373 struct cl_page_slice *slice)
375 struct ccc_page *cp = cl2ccc_page(slice);
376 struct cl_page *clp = slice->cpl_page;
377 struct ccc_object *clobj = cl2ccc(clp->cp_obj);
379 slp_page_fini_common(cp);
380 clobj->cob_transient_pages--;
384 static const struct cl_page_operations slp_transient_page_ops = {
385 .cpo_own = ccc_transient_page_own,
386 .cpo_assume = ccc_transient_page_assume,
387 .cpo_unassume = ccc_transient_page_unassume,
388 .cpo_disown = ccc_transient_page_disown,
389 .cpo_discard = ccc_transient_page_discard,
390 .cpo_vmpage = ccc_page_vmpage,
391 .cpo_is_vmlocked = slp_page_is_vmlocked,
392 .cpo_fini = slp_transient_page_fini,
393 .cpo_is_under_lock = ccc_page_is_under_lock,
396 .cpo_completion = slp_page_completion_read,
399 .cpo_completion = slp_page_completion_write_common,
404 /*****************************************************************************
410 static int slp_lock_enqueue(const struct lu_env *env,
411 const struct cl_lock_slice *slice,
412 struct cl_io *_, __u32 enqflags)
414 CLOBINVRNT(env, slice->cls_obj, ccc_object_invariant(slice->cls_obj));
416 liblustre_wait_event(0);
420 static const struct cl_lock_operations slp_lock_ops = {
421 .clo_fini = ccc_lock_fini,
422 .clo_enqueue = slp_lock_enqueue,
423 .clo_wait = ccc_lock_wait,
424 .clo_unuse = ccc_lock_unuse,
425 .clo_fits_into = ccc_lock_fits_into,
428 /*****************************************************************************
434 static int slp_io_rw_lock(const struct lu_env *env, struct cl_io *io,
435 enum cl_lock_mode mode, loff_t start, loff_t end)
439 LASSERT(io->ci_type == CIT_READ || io->ci_type == CIT_WRITE);
441 if (!io->u.ci_wr.wr_append) { // No lock without O_APPEND in liblustre
445 result = ccc_io_one_lock(env, io, 0, mode, start, end);
450 static int slp_io_write_lock(const struct lu_env *env,
451 const struct cl_io_slice *ios)
453 struct cl_io *io = ios->cis_io;
457 if (io->u.ci_wr.wr_append) {
459 end = OBD_OBJECT_EOF;
461 start = io->u.ci_wr.wr.crw_pos;
462 end = start + io->u.ci_wr.wr.crw_count - 1;
465 return slp_io_rw_lock(env, io, CLM_WRITE, start, end);
469 static int slp_io_trunc_iter_init(const struct lu_env *env,
470 const struct cl_io_slice *ios)
475 static int slp_io_trunc_start(const struct lu_env *env,
476 const struct cl_io_slice *ios)
481 static struct page *llu_get_user_page(int index, void *addr, int offset,
491 page->_offset = offset;
492 page->_count = count;
494 CFS_INIT_LIST_HEAD(&page->list);
495 CFS_INIT_LIST_HEAD(&page->_node);
500 static void llu_free_user_page(struct page *page)
505 static int llu_queue_pio(const struct lu_env *env, struct cl_io *io,
506 struct llu_io_group *group,
507 char *buf, size_t count, loff_t pos)
509 struct cl_object *obj = io->ci_obj;
510 struct inode *inode = ccc_object_inode(obj);
511 struct intnl_stat *st = llu_i2stat(inode);
512 struct obd_export *exp = llu_i2obdexp(inode);
514 int rc = 0, npages = 0, ret_bytes = 0;
517 struct ccc_page *clup;
518 struct cl_2queue *queue;
519 struct cl_sync_io *anchor = &ccc_env_info(env)->cti_sync_io;
525 local_lock = group->lig_params->lrp_lock_mode != LCK_NL;
527 queue = &io->ci_queue;
528 cl_2queue_init(queue);
531 /* prepare the pages array */
533 unsigned long index, offset, bytes;
535 offset = (pos & ~CFS_PAGE_MASK);
536 index = pos >> CFS_PAGE_SHIFT;
537 bytes = CFS_PAGE_SIZE - offset;
541 /* prevent read beyond file range */
542 if (/* local_lock && */
543 io->ci_type == CIT_READ && pos + bytes >= st->st_size) {
544 if (pos >= st->st_size)
546 bytes = st->st_size - pos;
549 /* prepare page for this index */
550 page = llu_get_user_page(index, buf - offset, offset, bytes);
556 clp = cl_page_find(env, obj,
558 page, CPT_TRANSIENT);
565 rc = cl_page_own(env, io, clp);
567 LASSERT(clp->cp_state == CPS_FREEING);
568 cl_page_put(env, clp);
572 clup = cl2ccc_page(cl_page_at(clp, &slp_device_type));
573 clup->cpg_sync_io = anchor;
574 cl_2queue_add(queue, clp);
576 /* drop the reference count for cl_page_find, so that the page
577 * will be freed in cl_2queue_fini. */
578 cl_page_put(env, clp);
580 cl_page_clip(env, clp, offset, offset+bytes);
587 group->lig_rwcount += bytes;
592 cl_sync_io_init(anchor, npages);
593 /* printk("Inited anchor with %d pages\n", npages); */
596 rc = cl_io_submit_rw(env, io,
597 io->ci_type == CIT_READ ? CRT_READ :
601 /* If some pages weren't sent for any reason, count
602 * then as completed, to avoid infinite wait. */
603 cl_page_list_for_each(clp, &queue->c2_qin) {
604 CL_PAGE_DEBUG(D_ERROR, env, clp,
606 cl_sync_io_note(anchor, +1);
608 /* wait for the IO to be finished. */
609 rc = cl_sync_io_wait(env, io, &queue->c2_qout, anchor);
615 cl_2queue_discard(env, io, queue);
616 cl_2queue_disown(env, io, queue);
617 cl_2queue_fini(env, queue);
623 struct llu_io_group * get_io_group(struct inode *inode, int maxpages,
624 struct lustre_rw_params *params)
626 struct llu_io_group *group;
628 OBD_ALLOC_PTR(group);
630 return ERR_PTR(-ENOMEM);
632 group->lig_params = params;
637 static int max_io_pages(ssize_t len, int iovlen)
639 return (((len + CFS_PAGE_SIZE -1) / CFS_PAGE_SIZE) + 2 + iovlen - 1);
642 void put_io_group(struct llu_io_group *group)
647 static int slp_io_start(const struct lu_env *env, const struct cl_io_slice *ios)
649 struct ccc_io *cio = cl2ccc_io(env, ios);
650 struct cl_io *io = ios->cis_io;
651 struct cl_object *obj = io->ci_obj;
652 struct inode *inode = ccc_object_inode(obj);
656 struct llu_io_group *iogroup;
657 struct lustre_rw_params p = {0};
659 struct intnl_stat *st = llu_i2stat(inode);
660 struct llu_inode_info *lli = llu_i2info(inode);
661 struct llu_io_session *session = cl2slp_io(env, ios)->sio_session;
662 int write = io->ci_type == CIT_WRITE;
664 CLOBINVRNT(env, obj, ccc_object_invariant(obj));
667 pos = io->u.ci_wr.wr.crw_pos;
668 cnt = io->u.ci_wr.wr.crw_count;
670 pos = io->u.ci_rd.rd.crw_pos;
671 cnt = io->u.ci_rd.rd.crw_count;
673 if (io->u.ci_wr.wr_append) {
674 p.lrp_lock_mode = LCK_PW;
676 p.lrp_brw_flags = OBD_BRW_SRVLOCK;
677 p.lrp_lock_mode = LCK_NL;
680 iogroup = get_io_group(inode, max_io_pages(cnt, cio->cui_nrsegs), &p);
682 RETURN(PTR_ERR(iogroup));
684 err = ccc_prep_size(env, obj, io, pos + cnt - 1, 0);
689 "%s ino %lu, "LPSZ" bytes, offset %lld, i_size %llu\n",
690 write?"Write":"Read", (unsigned long)st->st_ino,
691 cnt, (__u64)pos, (__u64)st->st_size);
693 if (write && io->u.ci_wr.wr_append)
694 pos = io->u.ci_wr.wr.crw_pos = st->st_size; /* XXX? Do we need to change io content too here? */
695 /* XXX What about if one write syscall writes at 2 different offsets? */
697 for (iovidx = 0; iovidx < cio->cui_nrsegs; iovidx++) {
698 char *buf = (char *) cio->cui_iov[iovidx].iov_base;
699 size_t count = cio->cui_iov[iovidx].iov_len;
705 if (IS_BAD_PTR(buf) || IS_BAD_PTR(buf + count)) {
706 GOTO(out, err = -EFAULT);
709 if (io->ci_type == CIT_READ) {
710 if (/* local_lock && */ pos >= st->st_size)
712 } else if (io->ci_type == CIT_WRITE) {
713 if (pos >= lli->lli_maxbytes) {
714 GOTO(out, err = -EFBIG);
716 if (pos + count >= lli->lli_maxbytes)
717 count = lli->lli_maxbytes - pos;
722 ret = llu_queue_pio(env, io, iogroup, buf, count, pos);
724 GOTO(out, err = ret);
729 if (io->ci_type == CIT_WRITE) {
730 // obd_adjust_kms(exp, lsm, pos, 0); // XXX
731 if (pos > st->st_size)
738 LASSERT(cnt == 0 || io->ci_type == CIT_READ); /* libsysio should guarantee this */
740 session->lis_groups[session->lis_ngroups++] = iogroup;
744 put_io_group(iogroup);
748 static const struct cl_io_operations ccc_io_ops = {
751 .cio_fini = ccc_io_fini,
752 .cio_start = slp_io_start,
753 .cio_end = ccc_io_end
756 .cio_fini = ccc_io_fini,
757 .cio_lock = slp_io_write_lock,
758 .cio_start = slp_io_start,
759 .cio_end = ccc_io_end
762 .cio_fini = ccc_io_fini,
763 .cio_iter_init = slp_io_trunc_iter_init,
764 .cio_start = slp_io_trunc_start
767 .cio_fini = ccc_io_fini
772 static struct slp_io *cl2slp_io(const struct lu_env *env,
773 const struct cl_io_slice *slice)
775 /* We call it just for assertion here */
776 cl2ccc_io(env, slice);
778 return slp_env_io(env);
781 /*****************************************************************************
783 * Temporary prototype thing: mirror obd-devices into cl devices.
787 int cl_sb_init(struct llu_sb_info *sbi)
789 struct cl_device *cl;
794 env = cl_env_get(&refcheck);
796 RETURN(PTR_ERR(env));
798 cl = cl_type_setup(env, NULL, &slp_device_type,
799 sbi->ll_dt_exp->exp_obd->obd_lu_dev);
801 GOTO(out, rc = PTR_ERR(cl));
804 sbi->ll_site = cl2lu_dev(cl)->ld_site;
806 cl_env_put(env, &refcheck);
810 int cl_sb_fini(struct llu_sb_info *sbi)
817 env = cl_env_get(&refcheck);
819 RETURN(PTR_ERR(env));
821 if (sbi->ll_cl != NULL) {
822 cl_stack_fini(env, sbi->ll_cl);
826 cl_env_put(env, &refcheck);
828 * If mount failed (sbi->ll_cl == NULL), and this there are no other
829 * mounts, stop device types manually (this usually happens
830 * automatically when last device is destroyed).
833 cl_env_cache_purge(~0);