Whamcloud - gitweb
LU-1194 llog: fix for not sync llcd at thread stop
[fs/lustre-release.git] / lustre / osc / osc_request.c
index 0de44b7..24832ea 100644 (file)
@@ -568,7 +568,7 @@ static int osc_sync_interpret(const struct lu_env *env,
                               struct ptlrpc_request *req,
                               void *arg, int rc)
 {
-        struct osc_async_args *aa = arg;
+       struct osc_fsync_args *fa = arg;
         struct ost_body *body;
         ENTRY;
 
@@ -581,27 +581,22 @@ static int osc_sync_interpret(const struct lu_env *env,
                 GOTO(out, rc = -EPROTO);
         }
 
-        *aa->aa_oi->oi_oa = body->oa;
+       *fa->fa_oi->oi_oa = body->oa;
 out:
-        rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
-        RETURN(rc);
+       rc = fa->fa_upcall(fa->fa_cookie, rc);
+       RETURN(rc);
 }
 
-static int osc_sync(const struct lu_env *env, struct obd_export *exp,
-                    struct obd_info *oinfo, obd_size start, obd_size end,
-                    struct ptlrpc_request_set *set)
+int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
+                 obd_enqueue_update_f upcall, void *cookie,
+                  struct ptlrpc_request_set *rqset)
 {
-        struct ptlrpc_request *req;
-        struct ost_body       *body;
-        struct osc_async_args *aa;
+       struct ptlrpc_request *req;
+       struct ost_body       *body;
+       struct osc_fsync_args *fa;
         int                    rc;
         ENTRY;
 
-        if (!oinfo->oi_oa) {
-                CDEBUG(D_INFO, "oa NULL\n");
-                RETURN(-EINVAL);
-        }
-
         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
         if (req == NULL)
                 RETURN(-ENOMEM);
@@ -617,20 +612,41 @@ static int osc_sync(const struct lu_env *env, struct obd_export *exp,
         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
         LASSERT(body);
         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
-        body->oa.o_size = start;
-        body->oa.o_blocks = end;
-        body->oa.o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
         osc_pack_capa(req, body, oinfo->oi_capa);
 
         ptlrpc_request_set_replen(req);
         req->rq_interpret_reply = osc_sync_interpret;
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        aa->aa_oi = oinfo;
+       CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
+       fa = ptlrpc_req_async_args(req);
+       fa->fa_oi = oinfo;
+       fa->fa_upcall = upcall;
+       fa->fa_cookie = cookie;
 
-        ptlrpc_set_add_req(set, req);
-        RETURN (0);
+       if (rqset == PTLRPCD_SET)
+               ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
+       else
+               ptlrpc_set_add_req(rqset, req);
+
+       RETURN (0);
+}
+
+static int osc_sync(const struct lu_env *env, struct obd_export *exp,
+                   struct obd_info *oinfo, obd_size start, obd_size end,
+                   struct ptlrpc_request_set *set)
+{
+       ENTRY;
+
+       if (!oinfo->oi_oa) {
+               CDEBUG(D_INFO, "oa NULL\n");
+               RETURN(-EINVAL);
+       }
+
+       oinfo->oi_oa->o_size = start;
+       oinfo->oi_oa->o_blocks = end;
+       oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
+
+       RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
 }
 
 /* Find and cancel locally locks matched by @mode in the resource found by
@@ -799,7 +815,7 @@ static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
                                 (cli->cl_max_rpcs_in_flight + 1);
                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
         }
-        oa->o_grant = cli->cl_avail_grant;
+       oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
         oa->o_dropped = cli->cl_lost_grant;
         cli->cl_lost_grant = 0;
         client_obd_list_unlock(&cli->cl_loi_list_lock);
@@ -1014,15 +1030,17 @@ static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
                 cli->cl_avail_grant = ocd->ocd_grant;
         }
 
-        client_obd_list_unlock(&cli->cl_loi_list_lock);
+       /* determine the appropriate chunk size used by osc_extent. */
+       cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-        CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld \n",
-               cli->cl_import->imp_obd->obd_name,
-               cli->cl_avail_grant, cli->cl_lost_grant);
+       CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
+               "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
+               cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
 
-        if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
-            cfs_list_empty(&cli->cl_grant_shrink_list))
-                osc_add_shrink_grant(cli);
+       if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
+           cfs_list_empty(&cli->cl_grant_shrink_list))
+               osc_add_shrink_grant(cli);
 }
 
 /* We assume that the reason this OSC got a short read is because it read
@@ -1636,7 +1654,6 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
                          struct osc_brw_async_args *aa)
 {
         struct ptlrpc_request *new_req;
-        struct ptlrpc_request_set *set = request->rq_set;
         struct osc_brw_async_args *new_aa;
         struct osc_async_page *oap;
         int rc = 0;
@@ -1653,15 +1670,12 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         if (rc)
                 RETURN(rc);
 
-        client_obd_list_lock(&aa->aa_cli->cl_loi_list_lock);
-
         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request != NULL) {
                         LASSERTF(request == oap->oap_request,
                                  "request %p != oap_request %p\n",
                                  request, oap->oap_request);
                         if (oap->oap_interrupted) {
-                                client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
                                 ptlrpc_req_finished(new_req);
                                 RETURN(-EINTR);
                         }
@@ -1679,8 +1693,9 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         new_aa = ptlrpc_req_async_args(new_req);
 
         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
-        cfs_list_splice(&aa->aa_oaps, &new_aa->aa_oaps);
-        CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+       cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
+       CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
+       cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
 
         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
                 if (oap->oap_request) {
@@ -1692,16 +1707,14 @@ int osc_brw_redo_request(struct ptlrpc_request *request,
         new_aa->aa_ocapa = aa->aa_ocapa;
         aa->aa_ocapa = NULL;
 
-        /* use ptlrpc_set_add_req is safe because interpret functions work
-         * in check_set context. only one way exist with access to request
-         * from different thread got -EINTR - this way protected with
-         * cl_loi_list_lock */
-        ptlrpc_set_add_req(set, new_req);
+       /* XXX: This code will run into problem if we're going to support
+        * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
+        * and wait for all of them to be finished. We should inherit request
+        * set from old request. */
+       ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
 
-        client_obd_list_unlock(&aa->aa_cli->cl_loi_list_lock);
-
-        DEBUG_REQ(D_INFO, new_req, "new request");
-        RETURN(0);
+       DEBUG_REQ(D_INFO, new_req, "new request");
+       RETURN(0);
 }
 
 /*
@@ -1858,9 +1871,11 @@ out:
 static int brw_interpret(const struct lu_env *env,
                          struct ptlrpc_request *req, void *data, int rc)
 {
-        struct osc_brw_async_args *aa = data;
-       struct osc_async_page *oap, *tmp;
-        struct client_obd *cli;
+       struct osc_brw_async_args *aa = data;
+       struct osc_extent *ext;
+       struct osc_extent *tmp;
+       struct cl_object  *obj = NULL;
+       struct client_obd *cli = aa->aa_cli;
         ENTRY;
 
         rc = osc_brw_fini_request(req, rc);
@@ -1895,46 +1910,80 @@ static int brw_interpret(const struct lu_env *env,
                 aa->aa_ocapa = NULL;
         }
 
-        cli = aa->aa_cli;
-        client_obd_list_lock(&cli->cl_loi_list_lock);
+       cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
+               if (obj == NULL && rc == 0) {
+                       obj = osc2cl(ext->oe_obj);
+                       cl_object_get(obj);
+               }
 
-        /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
-         * is called so we know whether to go to sync BRWs or wait for more
-         * RPCs to complete */
-        if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
-                cli->cl_w_in_flight--;
-        else
-                cli->cl_r_in_flight--;
-
-       /* the caller may re-use the oap after the completion call so
-        * we need to clean it up a little */
-       cfs_list_for_each_entry_safe(oap, tmp, &aa->aa_oaps,
-                       oap_rpc_item) {
-               cfs_list_del_init(&oap->oap_rpc_item);
-               osc_ap_completion(env, cli, aa->aa_oa, oap, 1, rc);
+               cfs_list_del_init(&ext->oe_link);
+               osc_extent_finish(env, ext, 1, rc);
+       }
+       LASSERT(cfs_list_empty(&aa->aa_exts));
+       LASSERT(cfs_list_empty(&aa->aa_oaps));
+
+       if (obj != NULL) {
+               struct obdo *oa = aa->aa_oa;
+               struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
+               unsigned long valid = 0;
+
+               LASSERT(rc == 0);
+               if (oa->o_valid & OBD_MD_FLBLOCKS) {
+                       attr->cat_blocks = oa->o_blocks;
+                       valid |= CAT_BLOCKS;
+               }
+               if (oa->o_valid & OBD_MD_FLMTIME) {
+                       attr->cat_mtime = oa->o_mtime;
+                       valid |= CAT_MTIME;
+               }
+               if (oa->o_valid & OBD_MD_FLATIME) {
+                       attr->cat_atime = oa->o_atime;
+                       valid |= CAT_ATIME;
+               }
+               if (oa->o_valid & OBD_MD_FLCTIME) {
+                       attr->cat_ctime = oa->o_ctime;
+                       valid |= CAT_CTIME;
+               }
+               if (valid != 0) {
+                       cl_object_attr_lock(obj);
+                       cl_object_attr_set(env, obj, attr, valid);
+                       cl_object_attr_unlock(obj);
+               }
+               cl_object_put(env, obj);
        }
        OBDO_FREE(aa->aa_oa);
 
-       osc_wake_cache_waiters(cli);
-       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
-
        cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
                          req->rq_bulk->bd_nob_transferred);
        osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
        ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
 
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
+        * is called so we know whether to go to sync BRWs or wait for more
+        * RPCs to complete */
+       if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
+               cli->cl_w_in_flight--;
+       else
+               cli->cl_r_in_flight--;
+       osc_wake_cache_waiters(cli);
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
        RETURN(rc);
 }
 
-/* The most tricky part of this function is that it will return with
- * cli->cli_loi_list_lock held.
+/**
+ * Build an RPC by the list of extent @ext_list. The caller must ensure
+ * that the total pages in this list are NOT over max pages per RPC.
+ * Extents in the list must be in OES_RPC state.
  */
 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
-                 cfs_list_t *rpc_list, int page_count, int cmd,
-                 pdl_policy_t pol)
+                 cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
 {
        struct ptlrpc_request *req = NULL;
+       struct osc_extent *ext;
+       CFS_LIST_HEAD(rpc_list);
        struct brw_page **pga = NULL;
        struct osc_brw_async_args *aa = NULL;
         struct obdo *oa = NULL;
@@ -1944,17 +1993,39 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
         struct ldlm_lock *lock = NULL;
         struct cl_req_attr crattr;
-        int i, rc, mpflag = 0;
-
-        ENTRY;
-        LASSERT(!cfs_list_empty(rpc_list));
+       obd_off starting_offset = OBD_OBJECT_EOF;
+       obd_off ending_offset = 0;
+       int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
+
+       ENTRY;
+       LASSERT(!cfs_list_empty(ext_list));
+
+       /* add pages into rpc_list to build BRW rpc */
+       cfs_list_for_each_entry(ext, ext_list, oe_link) {
+               LASSERT(ext->oe_state == OES_RPC);
+               mem_tight |= ext->oe_memalloc;
+               cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
+                       ++page_count;
+                       cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
+                       if (starting_offset > oap->oap_obj_off)
+                               starting_offset = oap->oap_obj_off;
+                       else
+                               LASSERT(oap->oap_page_off == 0);
+                       if (ending_offset < oap->oap_obj_off + oap->oap_count)
+                               ending_offset = oap->oap_obj_off +
+                                               oap->oap_count;
+                       else
+                               LASSERT(oap->oap_page_off + oap->oap_count ==
+                                       CFS_PAGE_SIZE);
+               }
+       }
 
-        if (cmd & OBD_BRW_MEMALLOC)
-                mpflag = cfs_memory_pressure_get_and_set();
+       if (mem_tight)
+               mpflag = cfs_memory_pressure_get_and_set();
 
-        memset(&crattr, 0, sizeof crattr);
-        OBD_ALLOC(pga, sizeof(*pga) * page_count);
-        if (pga == NULL)
+       memset(&crattr, 0, sizeof crattr);
+       OBD_ALLOC(pga, sizeof(*pga) * page_count);
+       if (pga == NULL)
                GOTO(out, rc = -ENOMEM);
 
        OBDO_ALLOC(oa);
@@ -1962,16 +2033,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
                GOTO(out, rc = -ENOMEM);
 
        i = 0;
-       cfs_list_for_each_entry(oap, rpc_list, oap_rpc_item) {
-               struct cl_page *page = osc_oap2cl_page(oap);
+       cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
+               struct cl_page *page = oap2cl_page(oap);
                if (clerq == NULL) {
                        clerq = cl_req_alloc(env, page, crt,
                                             1 /* only 1-object rpcs for
                                                * now */);
                        if (IS_ERR(clerq))
                                GOTO(out, rc = PTR_ERR(clerq));
-                        lock = oap->oap_ldlm_lock;
-                }
+                       lock = oap->oap_ldlm_lock;
+               }
+               if (mem_tight)
+                       oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
                 pga[i] = &oap->oap_brw_page;
                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
@@ -1982,8 +2055,8 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
 
         /* always get the data for the obdo for the rpc */
        LASSERT(clerq != NULL);
-        crattr.cra_oa = oa;
-        crattr.cra_capa = NULL;
+       crattr.cra_oa = oa;
+       crattr.cra_capa = NULL;
        memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
         if (lock) {
@@ -1995,18 +2068,18 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
         if (rc != 0) {
                 CERROR("cl_req_prep failed: %d\n", rc);
                GOTO(out, rc);
-        }
+       }
 
-        sort_brw_pages(pga, page_count);
-        rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
-                                  pga, &req, crattr.cra_capa, 1, 0);
-        if (rc != 0) {
-                CERROR("prep_req failed: %d\n", rc);
+       sort_brw_pages(pga, page_count);
+       rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
+                       pga, &req, crattr.cra_capa, 1, 0);
+       if (rc != 0) {
+               CERROR("prep_req failed: %d\n", rc);
                GOTO(out, rc);
        }
 
        req->rq_interpret_reply = brw_interpret;
-        if (cmd & OBD_BRW_MEMALLOC)
+       if (mem_tight != 0)
                 req->rq_memalloc = 1;
 
         /* Need to update the timestamps after the request is built in case
@@ -2019,17 +2092,72 @@ int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
 
        lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
 
-        CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
-        aa = ptlrpc_req_async_args(req);
-        CFS_INIT_LIST_HEAD(&aa->aa_oaps);
-        cfs_list_splice(rpc_list, &aa->aa_oaps);
-        CFS_INIT_LIST_HEAD(rpc_list);
-        aa->aa_clerq = clerq;
+       CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
+       aa = ptlrpc_req_async_args(req);
+       CFS_INIT_LIST_HEAD(&aa->aa_oaps);
+       cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
+       CFS_INIT_LIST_HEAD(&aa->aa_exts);
+       cfs_list_splice_init(ext_list, &aa->aa_exts);
+       aa->aa_clerq = clerq;
+
+       /* queued sync pages can be torn down while the pages
+        * were between the pending list and the rpc */
+       tmp = NULL;
+       cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
+               /* only one oap gets a request reference */
+               if (tmp == NULL)
+                       tmp = oap;
+               if (oap->oap_interrupted && !req->rq_intr) {
+                       CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
+                                       oap, req);
+                       ptlrpc_mark_interrupted(req);
+               }
+       }
+       if (tmp != NULL)
+               tmp->oap_request = ptlrpc_request_addref(req);
+
+       client_obd_list_lock(&cli->cl_loi_list_lock);
+       starting_offset >>= CFS_PAGE_SHIFT;
+       if (cmd == OBD_BRW_READ) {
+               cli->cl_r_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
+                                     starting_offset + 1);
+       } else {
+               cli->cl_w_in_flight++;
+               lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
+               lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
+               lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
+                                     starting_offset + 1);
+       }
+       client_obd_list_unlock(&cli->cl_loi_list_lock);
+
+       DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
+                 page_count, aa, cli->cl_r_in_flight,
+                 cli->cl_w_in_flight);
+
+       /* XXX: Maybe the caller can check the RPC bulk descriptor to
+        * see which CPU/NUMA node the majority of pages were allocated
+        * on, and try to assign the async RPC to the CPU core
+        * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
+        *
+        * But on the other hand, we expect that multiple ptlrpcd
+        * threads and the initial write sponsor can run in parallel,
+        * especially when data checksum is enabled, which is CPU-bound
+        * operation and single ptlrpcd thread cannot process in time.
+        * So more ptlrpcd threads sharing BRW load
+        * (with PDL_POLICY_ROUND) seems better.
+        */
+       ptlrpcd_add_req(req, pol, -1);
+       rc = 0;
+       EXIT;
+
 out:
-        if (cmd & OBD_BRW_MEMALLOC)
-                cfs_memory_pressure_restore(mpflag);
+       if (mem_tight != 0)
+               cfs_memory_pressure_restore(mpflag);
 
-        capa_put(crattr.cra_capa);
+       capa_put(crattr.cra_capa);
        if (rc != 0) {
                LASSERT(req == NULL);
 
@@ -2039,59 +2167,14 @@ out:
                         OBD_FREE(pga, sizeof(*pga) * page_count);
                 /* this should happen rarely and is pretty bad, it makes the
                  * pending list not follow the dirty order */
-                client_obd_list_lock(&cli->cl_loi_list_lock);
-                cfs_list_for_each_entry_safe(oap, tmp, rpc_list, oap_rpc_item) {
-                        cfs_list_del_init(&oap->oap_rpc_item);
-
-                        /* queued sync pages can be torn down while the pages
-                         * were between the pending list and the rpc */
-                        if (oap->oap_interrupted) {
-                                CDEBUG(D_INODE, "oap %p interrupted\n", oap);
-                                osc_ap_completion(env, cli, NULL, oap, 0,
-                                                  oap->oap_count);
-                                continue;
-                        }
-                       osc_ap_completion(env, cli, NULL, oap, 0, rc);
+               while (!cfs_list_empty(ext_list)) {
+                       ext = cfs_list_entry(ext_list->next, struct osc_extent,
+                                            oe_link);
+                       cfs_list_del_init(&ext->oe_link);
+                       osc_extent_finish(env, ext, 0, rc);
                }
                if (clerq && !IS_ERR(clerq))
                        cl_req_completion(env, clerq, rc);
-       } else {
-               struct osc_async_page *tmp = NULL;
-
-               /* queued sync pages can be torn down while the pages
-                * were between the pending list and the rpc */
-               LASSERT(aa != NULL);
-               client_obd_list_lock(&cli->cl_loi_list_lock);
-               cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
-                       /* only one oap gets a request reference */
-                       if (tmp == NULL)
-                               tmp = oap;
-                       if (oap->oap_interrupted && !req->rq_intr) {
-                               CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
-                                               oap, req);
-                               ptlrpc_mark_interrupted(req);
-                       }
-               }
-               if (tmp != NULL)
-                       tmp->oap_request = ptlrpc_request_addref(req);
-
-               DEBUG_REQ(D_INODE,req, "%d pages, aa %p. now %dr/%dw in flight",
-                         page_count, aa, cli->cl_r_in_flight,
-                         cli->cl_w_in_flight);
-
-               /* XXX: Maybe the caller can check the RPC bulk descriptor to
-                * see which CPU/NUMA node the majority of pages were allocated
-                * on, and try to assign the async RPC to the CPU core
-                * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
-                *
-                * But on the other hand, we expect that multiple ptlrpcd
-                * threads and the initial write sponsor can run in parallel,
-                * especially when data checksum is enabled, which is CPU-bound
-                * operation and single ptlrpcd thread cannot process in time.
-                * So more ptlrpcd threads sharing BRW load
-                * (with PDL_POLICY_ROUND) seems better.
-                */
-               ptlrpcd_add_req(req, pol, -1);
        }
        RETURN(rc);
 }
@@ -3265,15 +3348,12 @@ static int osc_reconnect(const struct lu_env *env,
                 cli->cl_lost_grant = 0;
                 client_obd_list_unlock(&cli->cl_loi_list_lock);
 
-                CDEBUG(D_CACHE, "request ocd_grant: %d cl_avail_grant: %ld "
-                       "cl_dirty: %ld cl_lost_grant: %ld\n", data->ocd_grant,
-                       cli->cl_avail_grant, cli->cl_dirty, lost_grant);
                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
-                       " ocd_grant: %d\n", data->ocd_connect_flags,
-                       data->ocd_version, data->ocd_grant);
-        }
+                      " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
+                      data->ocd_version, data->ocd_grant, lost_grant);
+       }
 
-        RETURN(0);
+       RETURN(0);
 }
 
 static int osc_disconnect(struct obd_export *exp)
@@ -3287,7 +3367,7 @@ static int osc_disconnect(struct obd_export *exp)
                 if (obd->u.cli.cl_conn_count == 1) {
                         /* Flush any remaining cancel messages out to the
                          * target */
-                        llog_sync(ctxt, exp);
+                       llog_sync(ctxt, exp, 0);
                 }
                 llog_ctxt_put(ctxt);
         } else {
@@ -3358,11 +3438,9 @@ static int osc_import_event(struct obd_device *obd,
                 if (!IS_ERR(env)) {
                         /* Reset grants */
                         cli = &obd->u.cli;
-                        client_obd_list_lock(&cli->cl_loi_list_lock);
                         /* all pages go to failing rpcs due to the invalid
                          * import */
                        osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
-                        client_obd_list_unlock(&cli->cl_loi_list_lock);
 
                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
                         cl_env_put(env, &refcheck);
@@ -3443,9 +3521,7 @@ static int brw_queue_work(const struct lu_env *env, void *data)
 
        CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
 
-       client_obd_list_lock(&cli->cl_loi_list_lock);
        osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
-       client_obd_list_unlock(&cli->cl_loi_list_lock);
        RETURN(0);
 }