Whamcloud - gitweb
LU-327 cleanup the client import of mgc
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 7cdf82f..4b5f2c3 100644 (file)
@@ -30,6 +30,9 @@
  * Use is subject to license terms.
  */
 /*
+ * Copyright (c) 2011 Whamcloud, Inc.
+ */
+/*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
  */
@@ -109,6 +112,7 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                         struct lov_mds_md *lmm, int lmm_bytes)
 {
         int lsm_size;
+        struct obd_import *imp = class_exp2cliimp(exp);
         ENTRY;
 
         if (lmm != NULL) {
@@ -156,7 +160,11 @@ static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
         }
 
-        (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
+        if (imp != NULL &&
+            (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
+                (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
+        else
+                (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
 
         RETURN(lsm_size);
 }
@@ -1117,7 +1125,7 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                 CWARN("%s: available grant < 0, the OSS is probably not running"
                       " with patch from bug20278 (%ld) \n",
                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
-                /* workaround for 1.6 servers which do not have 
+                /* workaround for 1.6 servers which do not have
                  * the patch from bug20278 */
                 cli->cl_avail_grant = ocd->ocd_grant;
         }
@@ -1189,7 +1197,7 @@ static int check_write_rcs(struct ptlrpc_request *req,
 
         /* return error if any niobuf was in error */
         for (i = 0; i < niocount; i++) {
-                if (remote_rcs[i] < 0)
+                if ((int)remote_rcs[i] < 0)
                         return(remote_rcs[i]);
 
                 if (remote_rcs[i] != 0) {
@@ -1341,11 +1349,17 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         pg_prev = pga[0];
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
                 struct brw_page *pg = pga[i];
+                int poff = pg->off & ~CFS_PAGE_MASK;
 
                 LASSERT(pg->count > 0);
-                LASSERTF((pg->off & ~CFS_PAGE_MASK) + pg->count <= CFS_PAGE_SIZE,
-                         "i: %d pg: %p off: "LPU64", count: %u\n", i, pg,
-                         pg->off, pg->count);
+                /* make sure there is no gap in the middle of page array */
+                LASSERTF(page_count == 1 ||
+                         (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
+                          ergo(i > 0 && i < page_count - 1,
+                               poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
+                          ergo(i == page_count - 1, poff == 0)),
+                         "i: %d/%d pg: %p off: "LPU64", count: %u\n",
+                         i, page_count, pg, pg->off, pg->count);
 #ifdef __linux__
                 LASSERTF(i == 0 || pg->off > pg_prev->off,
                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
@@ -1361,8 +1375,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
                         (pg->flag & OBD_BRW_SRVLOCK));
 
-                ptlrpc_prep_bulk_page(desc, pg->pg, pg->off & ~CFS_PAGE_MASK,
-                                      pg->count);
+                ptlrpc_prep_bulk_page(desc, pg->pg, poff, pg->count);
                 requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
@@ -1625,7 +1638,7 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 if (server_cksum == ~0 && rc > 0) {
                         CERROR("Protocol error: server %s set the 'checksum' "
                                "bit, but didn't send a checksum.  Not fatal, "
-                               "but please notify on http://bugzilla.lustre.org/\n",
+                               "but please notify on http://bugs.whamcloud.com/\n",
                                libcfs_nid2str(peer->nid));
                 } else if (server_cksum != client_cksum) {
                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
@@ -2426,26 +2439,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
         struct osc_brw_async_args *aa;
         const struct obd_async_page_ops *ops;
         CFS_LIST_HEAD(rpc_list);
-        CFS_LIST_HEAD(tmp_list);
-        unsigned int ending_offset;
-        unsigned  starting_offset = 0;
         int srvlock = 0, mem_tight = 0;
         struct cl_object *clob = NULL;
+        obd_off starting_offset = OBD_OBJECT_EOF;
+        unsigned int ending_offset;
+        int starting_page_off = 0;
         ENTRY;
 
         /* ASYNC_HP pages first. At present, when the lock the pages is
          * to be canceled, the pages covered by the lock will be sent out
          * with ASYNC_HP. We have to send out them as soon as possible. */
         cfs_list_for_each_entry_safe(oap, tmp, &lop->lop_urgent, oap_urgent_item) {
-                if (oap->oap_async_flags & ASYNC_HP) 
-                        cfs_list_move(&oap->oap_pending_item, &tmp_list);
-                else
-                        cfs_list_move_tail(&oap->oap_pending_item, &tmp_list);
+                if (oap->oap_async_flags & ASYNC_HP)
+                        cfs_list_move(&oap->oap_pending_item, &lop->lop_pending);
                 if (++page_count >= cli->cl_max_pages_per_rpc)
                         break;
         }
-
-        cfs_list_splice(&tmp_list, &lop->lop_pending);
         page_count = 0;
 
         /* first we find the pages we're allowed to work with */
@@ -2474,7 +2483,13 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 /* If there is a gap at the start of this page, it can't merge
                  * with any previous page, so we'll hand the network a
                  * "fragmented" page array that it can't transfer in 1 RDMA */
-                if (page_count != 0 && oap->oap_page_off != 0)
+                if (oap->oap_obj_off < starting_offset) {
+                        if (starting_page_off != 0)
+                                break;
+
+                        starting_page_off = oap->oap_page_off;
+                        starting_offset = oap->oap_obj_off + starting_page_off;
+                } else if (oap->oap_page_off != 0)
                         break;
 
                 /* in llite being 'ready' equates to the page being locked
@@ -2552,10 +2567,6 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
                 lop_update_pending(cli, lop, cmd, -1);
                 cfs_list_del_init(&oap->oap_urgent_item);
 
-                if (page_count == 0)
-                        starting_offset = (oap->oap_obj_off+oap->oap_page_off) &
-                                          (PTLRPC_MAX_BRW_SIZE - 1);
-
                 /* ask the caller for the size of the io as the rpc leaves. */
                 if (!(oap->oap_async_flags & ASYNC_COUNT_STABLE)) {
                         oap->oap_count =
@@ -2573,20 +2584,22 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
                 /* now put the page back in our accounting */
                 cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                if (page_count++ == 0)
+                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
+
                 if (oap->oap_brw_flags & OBD_BRW_MEMALLOC)
                         mem_tight = 1;
-                if (page_count == 0)
-                        srvlock = !!(oap->oap_brw_flags & OBD_BRW_SRVLOCK);
-                if (++page_count >= cli->cl_max_pages_per_rpc)
-                        break;
 
                 /* End on a PTLRPC_MAX_BRW_SIZE boundary.  We want full-sized
                  * RPCs aligned on PTLRPC_MAX_BRW_SIZE boundaries to help reads
                  * have the same alignment as the initial writes that allocated
                  * extents on the server. */
-                ending_offset = (oap->oap_obj_off + oap->oap_page_off +
-                                 oap->oap_count) & (PTLRPC_MAX_BRW_SIZE - 1);
-                if (ending_offset == 0)
+                ending_offset = oap->oap_obj_off + oap->oap_page_off +
+                                oap->oap_count;
+                if (!(ending_offset & (PTLRPC_MAX_BRW_SIZE - 1)))
+                        break;
+
+                if (page_count >= cli->cl_max_pages_per_rpc)
                         break;
 
                 /* If there is a gap at the end of this page, it can't merge
@@ -2620,6 +2633,7 @@ osc_send_oap_rpc(const struct lu_env *env, struct client_obd *cli,
 
         aa = ptlrpc_req_async_args(req);
 
+        starting_offset &= PTLRPC_MAX_BRW_SIZE - 1;
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
@@ -2780,7 +2794,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                         if (rc > 0)
                                 race_counter = 0;
-                        else
+                        else if (rc == 0)
                                 race_counter++;
                 }
                 if (lop_makes_rpc(cli, &loi->loi_read_lop, OBD_BRW_READ)) {
@@ -2791,7 +2805,7 @@ void osc_check_rpcs(const struct lu_env *env, struct client_obd *cli)
 
                         if (rc > 0)
                                 race_counter = 0;
-                        else
+                        else if (rc == 0)
                                 race_counter++;
                 }
 
@@ -2877,8 +2891,9 @@ static int osc_enter_cache(const struct lu_env *env,
 
         /* force the caller to try sync io.  this can jump the list
          * of queued writes and create a discontiguous rpc stream */
-        if (cli->cl_dirty_max < CFS_PAGE_SIZE || cli->cl_ar.ar_force_sync ||
-            loi->loi_ar.ar_force_sync)
+        if (OBD_FAIL_CHECK(OBD_FAIL_OSC_NO_GRANT) ||
+            cli->cl_dirty_max < CFS_PAGE_SIZE     ||
+            cli->cl_ar.ar_force_sync || loi->loi_ar.ar_force_sync)
                 RETURN(-EDQUOT);
 
         /* Hopefully normal case - cache space and write credits available */
@@ -2960,9 +2975,8 @@ int osc_prep_async_page(struct obd_export *exp, struct lov_stripe_md *lsm,
 
 int osc_queue_async_io(const struct lu_env *env, struct obd_export *exp,
                        struct lov_stripe_md *lsm, struct lov_oinfo *loi,
-                       struct osc_async_page *oap, int cmd, obd_off off,
-                       int count, obd_flag brw_flags,
-                       enum async_flags async_flags)
+                       struct osc_async_page *oap, int cmd, int off,
+                       int count, obd_flag brw_flags, enum async_flags async_flags)
 {
         struct client_obd *cli = &exp->exp_obd->u.cli;
         int rc = 0;
@@ -3607,8 +3621,8 @@ static int osc_statfs_interpret(const struct lu_env *env,
          *                   avail < ~0.1% max          max = avail + used
          *            1025 * avail < avail + used       used = blocks - free
          *            1024 * avail < used
-         *            1024 * avail < blocks - free                      
-         *                   avail < ((blocks - free) >> 10)    
+         *            1024 * avail < blocks - free
+         *                   avail < ((blocks - free) >> 10)
          *
          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
          * lose that amount of space so in those cases we report no space left
@@ -4500,23 +4514,7 @@ static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
                  *   client_disconnect_export()
                  */
                 obd_zombie_barrier();
-                /* If we set up but never connected, the
-                   client import will not have been cleaned. */
-                if (obd->u.cli.cl_import) {
-                        struct obd_import *imp;
-                        cfs_down_write(&obd->u.cli.cl_sem);
-                        imp = obd->u.cli.cl_import;
-                        CDEBUG(D_CONFIG, "%s: client import never connected\n",
-                               obd->obd_name);
-                        ptlrpc_invalidate_import(imp);
-                        if (imp->imp_rq_pool) {
-                                ptlrpc_free_rq_pool(imp->imp_rq_pool);
-                                imp->imp_rq_pool = NULL;
-                        }
-                        class_destroy_import(imp);
-                        cfs_up_write(&obd->u.cli.cl_sem);
-                        obd->u.cli.cl_import = NULL;
-                }
+                obd_cleanup_client_import(obd);
                 rc = obd_llog_finish(obd, 0);
                 if (rc != 0)
                         CERROR("failed to cleanup llogging subsystems\n");