Whamcloud - gitweb
LU-1154 clio: rename coo_attr_set to coo_attr_update
[fs/lustre-release.git] / lustre / osc / osc_io.c
index a9bad93..d4eb1e3 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2013, Intel Corporation.
+ * Copyright (c) 2011, 2014, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -67,14 +67,18 @@ static struct osc_io *cl2osc_io(const struct lu_env *env,
         return oio;
 }
 
-static struct osc_page *osc_cl_page_osc(struct cl_page *page)
+static struct osc_page *osc_cl_page_osc(struct cl_page *page,
+                                       struct osc_object *osc)
 {
-        const struct cl_page_slice *slice;
+       const struct cl_page_slice *slice;
 
-        slice = cl_page_at(page, &osc_device_type);
-        LASSERT(slice != NULL);
+       if (osc != NULL)
+               slice = cl_object_page_slice(&osc->oo_cl, page);
+       else
+               slice = cl_page_at(page, &osc_device_type);
+       LASSERT(slice != NULL);
 
-        return cl2osc_page(slice);
+       return cl2osc_page(slice);
 }
 
 
@@ -99,21 +103,21 @@ static int osc_io_submit(const struct lu_env *env,
                          const struct cl_io_slice *ios,
                         enum cl_req_type crt, struct cl_2queue *queue)
 {
-        struct cl_page    *page;
-        struct cl_page    *tmp;
-        struct client_obd *cli  = NULL;
-        struct osc_object *osc  = NULL; /* to keep gcc happy */
-        struct osc_page   *opg;
-        struct cl_io      *io;
-       CFS_LIST_HEAD     (list);
+       struct cl_page    *page;
+       struct cl_page    *tmp;
+       struct client_obd *cli  = NULL;
+       struct osc_object *osc  = NULL; /* to keep gcc happy */
+       struct osc_page   *opg;
+       struct cl_io      *io;
+       struct list_head  list = LIST_HEAD_INIT(list);
 
        struct cl_page_list *qin      = &queue->c2_qin;
        struct cl_page_list *qout     = &queue->c2_qout;
-       int queued = 0;
+       unsigned int queued = 0;
        int result = 0;
        int cmd;
        int brw_flags;
-       int max_pages;
+       unsigned int max_pages;
 
        LASSERT(qin->pl_nr > 0);
 
@@ -137,12 +141,12 @@ static int osc_io_submit(const struct lu_env *env,
                 io = page->cp_owner;
                 LASSERT(io != NULL);
 
-                opg = osc_cl_page_osc(page);
-                oap = &opg->ops_oap;
+               opg = osc_cl_page_osc(page, osc);
+               oap = &opg->ops_oap;
                LASSERT(osc == oap->oap_obj);
 
-               if (!cfs_list_empty(&oap->oap_pending_item) ||
-                   !cfs_list_empty(&oap->oap_rpc_item)) {
+               if (!list_empty(&oap->oap_pending_item) ||
+                   !list_empty(&oap->oap_rpc_item)) {
                        CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
                               oap, opg);
                         result = -EBUSY;
@@ -163,14 +167,19 @@ static int osc_io_submit(const struct lu_env *env,
                        continue;
                 }
 
-               cl_page_list_move(qout, qin, page);
                spin_lock(&oap->oap_lock);
                oap->oap_async_flags = ASYNC_URGENT|ASYNC_READY;
                oap->oap_async_flags |= ASYNC_COUNT_STABLE;
                spin_unlock(&oap->oap_lock);
 
                osc_page_submit(env, opg, crt, brw_flags);
-               cfs_list_add_tail(&oap->oap_pending_item, &list);
+               list_add_tail(&oap->oap_pending_item, &list);
+
+               if (page->cp_sync_io != NULL)
+                       cl_page_list_move(qout, qin, page);
+               else /* async IO */
+                       cl_page_list_del(env, qin, page);
+
                if (++queued == max_pages) {
                        queued = 0;
                        result = osc_queue_sync_pages(env, osc, &list, cmd,
@@ -195,7 +204,7 @@ static int osc_io_submit(const struct lu_env *env,
  * Expand stripe KMS if necessary.
  */
 static void osc_page_touch_at(const struct lu_env *env,
-                              struct cl_object *obj, pgoff_t idx, unsigned to)
+                             struct cl_object *obj, pgoff_t idx, size_t to)
 {
         struct lov_oinfo  *loi  = cl2osc(obj)->oo_oinfo;
         struct cl_attr    *attr = &osc_env_info(env)->oti_attr;
@@ -219,16 +228,16 @@ static void osc_page_touch_at(const struct lu_env *env,
 
        attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
        valid = CAT_MTIME | CAT_CTIME;
-        if (kms > loi->loi_kms) {
-                attr->cat_kms = kms;
-                valid |= CAT_KMS;
-        }
-        if (kms > loi->loi_lvb.lvb_size) {
-                attr->cat_size = kms;
-                valid |= CAT_SIZE;
-        }
-        cl_object_attr_set(env, obj, attr, valid);
-        cl_object_attr_unlock(obj);
+       if (kms > loi->loi_kms) {
+               attr->cat_kms = kms;
+               valid |= CAT_KMS;
+       }
+       if (kms > loi->loi_lvb.lvb_size) {
+               attr->cat_size = kms;
+               valid |= CAT_SIZE;
+       }
+       cl_object_attr_update(env, obj, attr, valid);
+       cl_object_attr_unlock(obj);
 }
 
 static int osc_io_commit_async(const struct lu_env *env,
@@ -261,18 +270,14 @@ static int osc_io_commit_async(const struct lu_env *env,
                }
        }
 
-       /*
-        * NOTE: here @page is a top-level page. This is done to avoid
-        * creation of sub-page-list.
-        */
        while (qin->pl_nr > 0) {
                struct osc_async_page *oap;
 
                page = cl_page_list_first(qin);
-               opg = osc_cl_page_osc(page);
+               opg = osc_cl_page_osc(page, osc);
                oap = &opg->ops_oap;
 
-               if (!cfs_list_empty(&oap->oap_rpc_item)) {
+               if (!list_empty(&oap->oap_rpc_item)) {
                        CDEBUG(D_CACHE, "Busy oap %p page %p for submit.\n",
                               oap, opg);
                        result = -EBUSY;
@@ -280,14 +285,13 @@ static int osc_io_commit_async(const struct lu_env *env,
                }
 
                /* The page may be already in dirty cache. */
-               if (cfs_list_empty(&oap->oap_pending_item)) {
+               if (list_empty(&oap->oap_pending_item)) {
                        result = osc_page_cache_add(env, &opg->ops_cl, io);
                        if (result != 0)
                                break;
                }
 
-               osc_page_touch_at(env, osc2cl(osc),
-                                 opg->ops_cl.cpl_page->cp_index,
+               osc_page_touch_at(env, osc2cl(osc), osc_index(opg),
                                  page == last_page ? to : PAGE_SIZE);
 
                cl_page_list_del(env, qin, page);
@@ -317,8 +321,8 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
        struct osc_object *osc = cl2osc(ios->cis_obj);
        struct client_obd *cli = osc_cli(osc);
        unsigned long c;
-       unsigned int npages;
-       unsigned int max_pages;
+       unsigned long npages;
+       unsigned long max_pages;
        ENTRY;
 
        if (cl_io_is_append(io))
@@ -332,15 +336,15 @@ static int osc_io_rw_iter_init(const struct lu_env *env,
        if (npages > max_pages)
                npages = max_pages;
 
-       c = cfs_atomic_read(cli->cl_lru_left);
+       c = atomic_long_read(cli->cl_lru_left);
        if (c < npages && osc_lru_reclaim(cli) > 0)
-               c = cfs_atomic_read(cli->cl_lru_left);
+               c = atomic_long_read(cli->cl_lru_left);
        while (c >= npages) {
-               if (c == cfs_atomic_cmpxchg(cli->cl_lru_left, c, c - npages)) {
+               if (c == atomic_long_cmpxchg(cli->cl_lru_left, c, c - npages)) {
                        oio->oi_lru_reserved = npages;
                        break;
                }
-               c = cfs_atomic_read(cli->cl_lru_left);
+               c = atomic_long_read(cli->cl_lru_left);
        }
 
        RETURN(0);
@@ -354,32 +358,32 @@ static void osc_io_rw_iter_fini(const struct lu_env *env,
        struct client_obd *cli = osc_cli(osc);
 
        if (oio->oi_lru_reserved > 0) {
-               cfs_atomic_add(oio->oi_lru_reserved, cli->cl_lru_left);
+               atomic_long_add(oio->oi_lru_reserved, cli->cl_lru_left);
                oio->oi_lru_reserved = 0;
        }
+       oio->oi_write_osclock = NULL;
 }
 
 static int osc_io_fault_start(const struct lu_env *env,
-                              const struct cl_io_slice *ios)
+                             const struct cl_io_slice *ios)
 {
-        struct cl_io       *io;
-        struct cl_fault_io *fio;
-
-        ENTRY;
+       struct cl_io       *io;
+       struct cl_fault_io *fio;
+       ENTRY;
 
-        io  = ios->cis_io;
-        fio = &io->u.ci_fault;
-        CDEBUG(D_INFO, "%lu %d %d\n",
-               fio->ft_index, fio->ft_writable, fio->ft_nob);
-        /*
-         * If mapping is writeable, adjust kms to cover this page,
-         * but do not extend kms beyond actual file size.
-         * See bug 10919.
-         */
-        if (fio->ft_writable)
-                osc_page_touch_at(env, ios->cis_obj,
-                                  fio->ft_index, fio->ft_nob);
-        RETURN(0);
+       io  = ios->cis_io;
+       fio = &io->u.ci_fault;
+       CDEBUG(D_INFO, "%lu %d %zu\n",
+               fio->ft_index, fio->ft_writable, fio->ft_nob);
+       /*
+        * If mapping is writeable, adjust kms to cover this page,
+        * but do not extend kms beyond actual file size.
+        * See bug 10919.
+        */
+       if (fio->ft_writable)
+               osc_page_touch_at(env, ios->cis_obj,
+                                 fio->ft_index, fio->ft_nob);
+       RETURN(0);
 }
 
 static int osc_async_upcall(void *a, int rc)
@@ -391,7 +395,6 @@ static int osc_async_upcall(void *a, int rc)
         return 0;
 }
 
-#if defined(__KERNEL__)
 /**
  * Checks that there are no pages being written in the extent being truncated.
  */
@@ -404,19 +407,13 @@ static int trunc_check_cb(const struct lu_env *env, struct cl_io *io,
 
        oap = &ops->ops_oap;
        if (oap->oap_cmd & OBD_BRW_WRITE &&
-           !cfs_list_empty(&oap->oap_pending_item))
+           !list_empty(&oap->oap_pending_item))
                CL_PAGE_DEBUG(D_ERROR, env, page, "exists " LPU64 "/%s.\n",
                                start, current->comm);
 
-#ifdef __linux__
-       {
-               struct page *vmpage = cl_page_vmpage(env, page);
-               if (PageLocked(vmpage))
-                       CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
-                              ops, page->cp_index,
-                              (oap->oap_cmd & OBD_BRW_RWMASK));
-       }
-#endif
+       if (PageLocked(page->cp_vmpage))
+               CDEBUG(D_CACHE, "page %p index %lu locked for %d.\n",
+                      ops, osc_index(ops), oap->oap_cmd & OBD_BRW_RWMASK);
 
        return CLP_GANG_OKAY;
 }
@@ -439,13 +436,6 @@ static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
                                start + partial, CL_PAGE_EOF,
                                trunc_check_cb, (void *)&size);
 }
-#else /* __KERNEL__ */
-static void osc_trunc_check(const struct lu_env *env, struct cl_io *io,
-                           struct osc_io *oio, __u64 size)
-{
-       return;
-}
-#endif
 
 static int osc_io_setattr_start(const struct lu_env *env,
                                 const struct cl_io_slice *slice)
@@ -487,14 +477,15 @@ static int osc_io_setattr_start(const struct lu_env *env,
                        }
                        if (ia_valid & ATTR_CTIME_SET) {
                                attr->cat_ctime = lvb->lvb_ctime;
-                                cl_valid |= CAT_CTIME;
-                        }
-                        result = cl_object_attr_set(env, obj, attr, cl_valid);
-                }
-                cl_object_attr_unlock(obj);
-        }
-        memset(oa, 0, sizeof(*oa));
-        if (result == 0) {
+                               cl_valid |= CAT_CTIME;
+                       }
+                       result = cl_object_attr_update(env, obj, attr,
+                                                      cl_valid);
+               }
+               cl_object_attr_unlock(obj);
+       }
+       memset(oa, 0, sizeof(*oa));
+       if (result == 0) {
                oa->o_oi = loi->loi_oi;
                oa->o_mtime = attr->cat_mtime;
                oa->o_atime = attr->cat_atime;
@@ -569,16 +560,15 @@ static void osc_io_setattr_end(const struct lu_env *env,
 static int osc_io_read_start(const struct lu_env *env,
                              const struct cl_io_slice *slice)
 {
-       struct osc_io    *oio  = cl2osc_io(env, slice);
        struct cl_object *obj  = slice->cis_obj;
        struct cl_attr   *attr = &osc_env_info(env)->oti_attr;
        int rc = 0;
        ENTRY;
 
-       if (oio->oi_lockless == 0 && !slice->cis_io->ci_noatime) {
+       if (!slice->cis_io->ci_noatime) {
                cl_object_attr_lock(obj);
                attr->cat_atime = LTIME_S(CFS_CURRENT_TIME);
-               rc = cl_object_attr_set(env, obj, attr, CAT_ATIME);
+               rc = cl_object_attr_update(env, obj, attr, CAT_ATIME);
                cl_object_attr_unlock(obj);
        }
 
@@ -588,25 +578,18 @@ static int osc_io_read_start(const struct lu_env *env,
 static int osc_io_write_start(const struct lu_env *env,
                               const struct cl_io_slice *slice)
 {
-        struct osc_io    *oio   = cl2osc_io(env, slice);
-        struct cl_object *obj   = slice->cis_obj;
-        struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
-        int              result = 0;
-        ENTRY;
-
-        if (oio->oi_lockless == 0) {
-               OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
-                cl_object_attr_lock(obj);
-                result = cl_object_attr_get(env, obj, attr);
-                if (result == 0) {
-                        attr->cat_mtime = attr->cat_ctime =
-                                LTIME_S(CFS_CURRENT_TIME);
-                        result = cl_object_attr_set(env, obj, attr,
-                                                    CAT_MTIME | CAT_CTIME);
-                }
-                cl_object_attr_unlock(obj);
-        }
-        RETURN(result);
+       struct cl_object *obj   = slice->cis_obj;
+       struct cl_attr   *attr  = &osc_env_info(env)->oti_attr;
+       int rc = 0;
+       ENTRY;
+
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_DELAY_SETTIME, 1);
+       cl_object_attr_lock(obj);
+       attr->cat_mtime = attr->cat_ctime = LTIME_S(CFS_CURRENT_TIME);
+       rc = cl_object_attr_update(env, obj, attr, CAT_MTIME | CAT_CTIME);
+       cl_object_attr_unlock(obj);
+
+       RETURN(rc);
 }
 
 static int osc_fsync_ost(const struct lu_env *env, struct osc_object *obj,
@@ -783,8 +766,7 @@ static void osc_req_attr_set(const struct lu_env *env,
        struct lov_oinfo *oinfo;
        struct cl_req    *clerq;
        struct cl_page   *apage; /* _some_ page in @clerq */
-       struct cl_lock   *lock;  /* _some_ lock protecting @apage */
-       struct osc_lock  *olck;
+       struct ldlm_lock *lock;  /* _some_ lock protecting @apage */
        struct osc_page  *opg;
        struct obdo      *oa;
        struct ost_lvb   *lvb;
@@ -814,38 +796,37 @@ static void osc_req_attr_set(const struct lu_env *env,
                oa->o_valid |= OBD_MD_FLID;
        }
        if (flags & OBD_MD_FLHANDLE) {
-                clerq = slice->crs_req;
-                LASSERT(!cfs_list_empty(&clerq->crq_pages));
-                apage = container_of(clerq->crq_pages.next,
-                                     struct cl_page, cp_flight);
-                opg = osc_cl_page_osc(apage);
-                apage = opg->ops_cl.cpl_page; /* now apage is a sub-page */
-                lock = cl_lock_at_page(env, apage->cp_obj, apage, NULL, 1, 1);
-                if (lock == NULL) {
-                        struct cl_object_header *head;
-                        struct cl_lock          *scan;
-
-                        head = cl_object_header(apage->cp_obj);
-                        cfs_list_for_each_entry(scan, &head->coh_locks,
-                                                cll_linkage)
-                                CL_LOCK_DEBUG(D_ERROR, env, scan,
-                                              "no cover page!\n");
-                        CL_PAGE_DEBUG(D_ERROR, env, apage,
-                                      "dump uncover page!\n");
-                        libcfs_debug_dumpstack(NULL);
-                        LBUG();
-                }
+               clerq = slice->crs_req;
+               LASSERT(!list_empty(&clerq->crq_pages));
+               apage = container_of(clerq->crq_pages.next,
+                                    struct cl_page, cp_flight);
+               opg = osc_cl_page_osc(apage, NULL);
+               lock = osc_dlmlock_at_pgoff(env, cl2osc(obj), osc_index(opg),
+                                           1, 1);
+               if (lock == NULL && !opg->ops_srvlock) {
+                       struct ldlm_resource *res;
+                       struct ldlm_res_id *resname;
+
+                       CL_PAGE_DEBUG(D_ERROR, env, apage, "uncovered page!\n");
+
+                       resname = &osc_env_info(env)->oti_resname;
+                       ostid_build_res_name(&oinfo->loi_oi, resname);
+                       res = ldlm_resource_get(
+                               osc_export(cl2osc(obj))->exp_obd->obd_namespace,
+                               NULL, resname, LDLM_EXTENT, 0);
+                       ldlm_resource_dump(D_ERROR, res);
+
+                       libcfs_debug_dumpstack(NULL);
+                       LBUG();
+               }
 
-                olck = osc_lock_at(lock);
-                LASSERT(olck != NULL);
-                LASSERT(ergo(opg->ops_srvlock, olck->ols_lock == NULL));
-                /* check for lockless io. */
-                if (olck->ols_lock != NULL) {
-                        oa->o_handle = olck->ols_lock->l_remote_handle;
-                        oa->o_valid |= OBD_MD_FLHANDLE;
-                }
-                cl_lock_put(env, lock);
-        }
+               /* check for lockless io. */
+               if (lock != NULL) {
+                       oa->o_handle = lock->l_remote_handle;
+                       oa->o_valid |= OBD_MD_FLHANDLE;
+                       LDLM_LOCK_PUT(lock);
+               }
+       }
 }
 
 static const struct cl_req_operations osc_req_ops = {
@@ -866,18 +847,18 @@ int osc_io_init(const struct lu_env *env,
 }
 
 int osc_req_init(const struct lu_env *env, struct cl_device *dev,
-                 struct cl_req *req)
+                struct cl_req *req)
 {
-        struct osc_req *or;
-        int result;
-
-       OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, __GFP_IO);
-        if (or != NULL) {
-                cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
-                result = 0;
-        } else
-                result = -ENOMEM;
-        return result;
+       struct osc_req *or;
+       int result;
+
+       OBD_SLAB_ALLOC_PTR_GFP(or, osc_req_kmem, GFP_NOFS);
+       if (or != NULL) {
+               cl_req_slice_add(req, &or->or_cl, dev, &osc_req_ops);
+               result = 0;
+       } else
+               result = -ENOMEM;
+       return result;
 }
 
 /** @} osc */