Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lustre / osc / osc_request.c
index b2172ce..fbd698f 100644 (file)
@@ -492,6 +492,9 @@ static int osc_destroy(struct obd_export *exp, struct obdo *oa,
         request->rq_replen = lustre_msg_size(1, &size);
 
         rc = ptlrpc_queue_wait(request);
+        
+        if (rc == -ENOENT)
+                rc = 0;
         if (rc)
                 GOTO(out, rc);
 
@@ -540,6 +543,11 @@ static void osc_consume_write_grant(struct client_obd *cli,
         LASSERT(cli->cl_avail_grant >= 0);
 }
 
+static unsigned long rpcs_in_flight(struct client_obd *cli)
+{
+        return cli->cl_r_in_flight + cli->cl_w_in_flight;
+}
+
 /* caller must hold loi_list_lock */
 void osc_wake_cache_waiters(struct client_obd *cli)
 {
@@ -556,12 +564,10 @@ void osc_wake_cache_waiters(struct client_obd *cli)
 
                 /* if still dirty cache but no grant wait for pending RPCs that
                  * may yet return us some grant before doing sync writes */
-                if (cli->cl_brw_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
-                        CDEBUG(D_CACHE, "%d BRWs in flight, no grant\n",
-                               cli->cl_brw_in_flight);
-                        return;
+                if (cli->cl_w_in_flight && cli->cl_avail_grant < PAGE_SIZE) {
+                        CDEBUG(D_CACHE, "%u BRW writes in flight, no grant\n",
+                               cli->cl_w_in_flight);
                 }
-
                 ocw = list_entry(l, struct osc_cache_waiter, ocw_entry);
                 list_del_init(&ocw->ocw_entry);
                 if (cli->cl_avail_grant < PAGE_SIZE) {
@@ -602,7 +608,7 @@ static void handle_short_read(int nob_read, obd_count page_count,
 
                 if (pga->count > nob_read) {
                         /* EOF inside this page */
-                        ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
+                        ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
                         memset(ptr + nob_read, 0, pga->count - nob_read);
                         kunmap(pga->pg);
                         page_count--;
@@ -617,7 +623,7 @@ static void handle_short_read(int nob_read, obd_count page_count,
 
         /* zero remaining pages */
         while (page_count-- > 0) {
-                ptr = kmap(pga->pg) + (pga->off & ~PAGE_MASK);
+                ptr = kmap(pga->pg) + (pga->page_offset & ~PAGE_MASK);
                 memset(ptr, 0, pga->count);
                 kunmap(pga->pg);
                 pga++;
@@ -674,7 +680,7 @@ static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
                 return 0;
         }
 
-        return (p1->off + p1->count == p2->off);
+        return (p1->disk_offset + p1->count == p2->disk_offset);
 }
 
 #if CHECKSUM_BULK
@@ -753,29 +759,32 @@ static int osc_brw_prep_request(int cmd, struct obd_import *imp,struct obdo *oa,
         ioobj->ioo_bufcnt = niocount;
 
         LASSERT (page_count > 0);
+
         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
                 struct brw_page *pg = &pga[i];
                 struct brw_page *pg_prev = pg - 1;
 
                 LASSERT(pg->count > 0);
-                LASSERT((pg->off & ~PAGE_MASK)+ pg->count <= PAGE_SIZE);
-                LASSERTF(i == 0 || pg->off > pg_prev->off,
+                LASSERTF((pg->page_offset & ~PAGE_MASK)+ pg->count <= PAGE_SIZE,
+                         "i: %d pg: %p pg_off: "LPU64", count: %u\n", i, pg,
+                         pg->page_offset, pg->count);
+                LASSERTF(i == 0 || pg->disk_offset > pg_prev->disk_offset,
                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
                          i, page_count,
-                         pg->pg, pg->pg->private, pg->pg->index, pg->off,
+                         pg->pg, pg->pg->private, pg->pg->index, pg->disk_offset,
                          pg_prev->pg, pg_prev->pg->private, pg_prev->pg->index,
-                         pg_prev->off);
+                         pg_prev->disk_offset);
 
                 ptlrpc_prep_bulk_page(desc, pg->pg,
-                                      pg->off & ~PAGE_MASK, pg->count);
+                                      pg->page_offset & ~PAGE_MASK, pg->count);
                 requested_nob += pg->count;
 
                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
                         niobuf--;
                         niobuf->len += pg->count;
                 } else {
-                        niobuf->offset = pg->off;
+                        niobuf->offset = pg->disk_offset;
                         niobuf->len    = pg->count;
                         niobuf->flags  = pg->flag;
                 }
@@ -993,8 +1002,8 @@ static void sort_brw_pages(struct brw_page *array, int num)
                 for (i = stride ; i < num ; i++) {
                         tmp = array[i];
                         j = i;
-                        while (j >= stride && array[j - stride].off >
-                                tmp.off) {
+                        while (j >= stride && array[j - stride].disk_offset >
+                                tmp.disk_offset) {
                                 array[j] = array[j - stride];
                                 j -= stride;
                         }
@@ -1193,9 +1202,10 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
         struct osc_async_page *oap;
         struct client_obd *cli;
         struct list_head *pos, *n;
+        struct timeval now;
         ENTRY;
 
-
+        do_gettimeofday(&now);
         rc = osc_brw_fini_request(request, aa->aa_oa, aa->aa_requested_nob,
                                   aa->aa_nio_count, aa->aa_page_count,
                                   aa->aa_pga, rc);
@@ -1215,10 +1225,22 @@ static int brw_interpret_oap(struct ptlrpc_request *request,
 
         spin_lock(&cli->cl_loi_list_lock);
 
+        if (request->rq_reqmsg->opc == OST_WRITE)
+                lprocfs_stime_record(&cli->cl_write_stime, &now,
+                                     &request->rq_rpcd_start);
+        else
+                lprocfs_stime_record(&cli->cl_read_stime, &now,
+                                     &request->rq_rpcd_start);
+
+
+
         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
          * is called so we know whether to go to sync BRWs or wait for more
          * RPCs to complete */
-        cli->cl_brw_in_flight--;
+        if (request->rq_reqmsg->opc == OST_WRITE)
+                cli->cl_w_in_flight--;
+        else
+                cli->cl_r_in_flight--;
 
         /* the caller may re-use the oap after the completion call so
          * we need to clean it up a little */
@@ -1276,7 +1298,8 @@ static struct ptlrpc_request *osc_build_req(struct client_obd *cli,
                         ops = oap->oap_caller_ops;
                         caller_data = oap->oap_caller_data;
                 }
-                pga[i].off = oap->oap_obj_off + oap->oap_page_off;
+                pga[i].disk_offset = oap->oap_obj_off + oap->oap_page_off;
+                pga[i].page_offset = pga[i].disk_offset;
                 pga[i].pg = oap->oap_page;
                 pga[i].count = oap->oap_count;
                 pga[i].flag = oap->oap_brw_flags;
@@ -1461,17 +1484,20 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
 #ifdef __KERNEL__
         if (cmd == OBD_BRW_READ) {
                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
-                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_brw_in_flight);
+                lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
         } else {
                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
                 lprocfs_oh_tally(&cli->cl_write_rpc_hist,
-                                 cli->cl_brw_in_flight);
+                                 cli->cl_w_in_flight);
         }
 #endif
 
         spin_lock(&cli->cl_loi_list_lock);
 
-        cli->cl_brw_in_flight++;
+        if (cmd == OBD_BRW_READ)
+                cli->cl_r_in_flight++;
+        else
+                cli->cl_w_in_flight++;
         /* queued sync pages can be torn down while the pages
          * were between the pending list and the rpc */
         list_for_each(pos, &aa->aa_oaps) {
@@ -1484,8 +1510,9 @@ static int osc_send_oap_rpc(struct client_obd *cli, struct lov_oinfo *loi,
                 }
         }
 
-        CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %d in flight\n", request,
-               page_count, aa, cli->cl_brw_in_flight);
+        CDEBUG(D_INODE, "req %p: %d pages, aa %p.  now %dr/%dw in flight\n",
+                        request, page_count, aa, cli->cl_r_in_flight,
+                        cli->cl_w_in_flight);
 
         oap->oap_request = ptlrpc_request_addref(request);
         request->rq_interpret_reply = brw_interpret_oap;
@@ -1609,9 +1636,9 @@ static void osc_check_rpcs(struct client_obd *cli)
         ENTRY;
 
         while ((loi = osc_next_loi(cli)) != NULL) {
-                LOI_DEBUG(loi, "%d in flight\n", cli->cl_brw_in_flight);
-
-                if (cli->cl_brw_in_flight >= cli->cl_max_rpcs_in_flight)
+                LOI_DEBUG(loi, "%lu in flight\n", rpcs_in_flight(cli));
+                
+                if (rpcs_in_flight(cli) >= cli->cl_max_rpcs_in_flight)                        
                         break;
 
                 /* attempt some read/write balancing by alternating between
@@ -1676,7 +1703,7 @@ static int ocw_granted(struct client_obd *cli, struct osc_cache_waiter *ocw)
         int rc;
         ENTRY;
         spin_lock(&cli->cl_loi_list_lock);
-        rc = list_empty(&ocw->ocw_entry) || cli->cl_brw_in_flight == 0;
+        rc = list_empty(&ocw->ocw_entry) || rpcs_in_flight(cli) == 0;
         spin_unlock(&cli->cl_loi_list_lock);
         RETURN(rc);
 };
@@ -1688,6 +1715,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
 {
         struct osc_cache_waiter ocw;
         struct l_wait_info lwi = { 0 };
+        struct timeval start, stop;
 
         CDEBUG(D_CACHE, "dirty: %ld dirty_max: %ld dropped: %lu grant: %lu\n",
                cli->cl_dirty, cli->cl_dirty_max, cli->cl_lost_grant,
@@ -1707,7 +1735,7 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
         /* Make sure that there are write rpcs in flight to wait for.  This
          * is a little silly as this object may not have any pending but
          * other objects sure might. */
-        if (cli->cl_brw_in_flight) {
+        if (cli->cl_w_in_flight) {                
                 list_add_tail(&ocw.ocw_entry, &cli->cl_cache_waiters);
                 init_waitqueue_head(&ocw.ocw_waitq);
                 ocw.ocw_oap = oap;
@@ -1718,9 +1746,11 @@ static int osc_enter_cache(struct client_obd *cli, struct lov_oinfo *loi,
                 spin_unlock(&cli->cl_loi_list_lock);
 
                 CDEBUG(0, "sleeping for cache space\n");
+                do_gettimeofday(&start);
                 l_wait_event(ocw.ocw_waitq, ocw_granted(cli, &ocw), &lwi);
-
+                do_gettimeofday(&stop);
                 spin_lock(&cli->cl_loi_list_lock);
+                lprocfs_stime_record(&cli->cl_enter_stime, &stop, &start);
                 if (!list_empty(&ocw.ocw_entry)) {
                         list_del(&ocw.ocw_entry);
                         RETURN(-EINTR);
@@ -2091,9 +2121,9 @@ static int sanosc_brw_read(struct obd_export *exp, struct obdo *oa,
         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
                 LASSERT(mapped == 0 ||
-                        pga[mapped].off > pga[mapped - 1].off);
+                        pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
 
-                nioptr->offset = pga[mapped].off;
+                nioptr->offset = pga[mapped].disk_offset;
                 nioptr->len    = pga[mapped].count;
                 nioptr->flags  = pga[mapped].flag;
         }
@@ -2221,9 +2251,9 @@ static int sanosc_brw_write(struct obd_export *exp, struct obdo *oa,
         for (mapped = 0; mapped < page_count; mapped++, nioptr++) {
                 LASSERT(PageLocked(pga[mapped].pg));
                 LASSERT(mapped == 0 ||
-                        pga[mapped].off > pga[mapped - 1].off);
+                        pga[mapped].disk_offset > pga[mapped - 1].disk_offset);
 
-                nioptr->offset = pga[mapped].off;
+                nioptr->offset = pga[mapped].disk_offset;
                 nioptr->len    = pga[mapped].count;
                 nioptr->flags  = pga[mapped].flag;
         }
@@ -2336,7 +2366,11 @@ static void osc_set_data_with_check(struct lustre_handle *lockh, void *data)
 {
         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
 
-        LASSERT(lock != NULL);
+        if (lock == NULL) {
+                CERROR("lockh %p, data %p - client evicted?\n", lockh, data);
+                return;
+        }
+
         l_lock(&lock->l_resource->lr_namespace->ns_lock);
 #ifdef __KERNEL__
         if (lock->l_ast_data && lock->l_ast_data != data) {
@@ -2376,6 +2410,8 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         struct obd_device *obd = exp->exp_obd;
         struct ldlm_res_id res_id = { .name = {0} };
         struct ost_lvb lvb;
+        struct ldlm_reply *rep;
+        struct ptlrpc_request *req = NULL;
         int rc;
         ENTRY;
 
@@ -2431,9 +2467,32 @@ static int osc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
         }
 
  no_match:
-        rc = ldlm_cli_enqueue(exp, NULL, obd->obd_namespace, res_id, type,
+        if (*flags & LDLM_FL_HAS_INTENT) {
+                int size[2] = {0, sizeof(struct ldlm_request)};
+
+                req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_DLM_VERSION,
+                                      LDLM_ENQUEUE, 2, size, NULL);
+                if (req == NULL)
+                        RETURN(-ENOMEM);
+
+                size[0] = sizeof(*rep);
+                size[1] = sizeof(lvb);
+                req->rq_replen = lustre_msg_size(2, size);
+        }
+        rc = ldlm_cli_enqueue(exp, req, obd->obd_namespace, res_id, type,
                               policy, mode, flags, bl_cb, cp_cb, gl_cb, data,
                               &lvb, sizeof(lvb), lustre_swab_ost_lvb, lockh);
+        if (req != NULL) {
+                if (rc == ELDLM_LOCK_ABORTED) {
+                        /* swabbed by ldlm_cli_enqueue() */
+                        LASSERT_REPSWABBED(req, 0);
+                        rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*rep));
+                        LASSERT(rep != NULL);
+                        if (rep->lock_policy_res1)
+                                rc = rep->lock_policy_res1;
+                }
+                ptlrpc_req_finished(req);
+        }
 
         if ((*flags & LDLM_FL_HAS_INTENT && rc == ELDLM_LOCK_ABORTED) || !rc) {
                 CDEBUG(D_INODE, "received kms == "LPU64", blocks == "LPU64"\n",
@@ -2468,7 +2527,7 @@ static int osc_match(struct obd_export *exp, struct lov_stripe_md *lsm,
         rc = ldlm_lock_match(obd->obd_namespace, *flags, &res_id, type,
                              policy, mode, lockh);
         if (rc) {
-                if (!(*flags & LDLM_FL_TEST_LOCK))
+               // if (!(*flags & LDLM_FL_TEST_LOCK))
                         osc_set_data_with_check(lockh, data);
                 RETURN(rc);
         }
@@ -2804,9 +2863,11 @@ static int osc_set_info(struct obd_export *exp, obd_count keylen,
 
         ctxt = llog_get_context(&exp->exp_obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
         if (ctxt) {
-                rc = llog_initiator_connect(ctxt);
-                if (rc)
-                        RETURN(rc);
+                if (rc == 0)
+                        rc = llog_initiator_connect(ctxt);
+                else
+                        CERROR("cannot establish the connect for ctxt %p: %d\n",
+                               ctxt, rc);
         }
 
         imp->imp_server_timeout = 1;
@@ -2929,6 +2990,15 @@ static int osc_import_event(struct obd_device *obd,
                 break;
         }
         case IMP_EVENT_ACTIVE: {
+                /* Only do this on the MDS OSC's */
+                if (imp->imp_server_timeout) {
+                        struct osc_creator *oscc = &obd->u.cli.cl_oscc;
+
+                        spin_lock(&oscc->oscc_lock);
+                        oscc->oscc_flags &= ~OSCC_FLAG_NOSPC;
+                        spin_unlock(&oscc->oscc_lock);
+                }
+
                 if (obd->obd_observer)
                         rc = obd_notify(obd->obd_observer, obd, 1, 0);
                 break;
@@ -3004,6 +3074,8 @@ struct obd_ops osc_obd_ops = {
         .o_detach               = osc_detach,
         .o_setup                = osc_setup,
         .o_cleanup              = osc_cleanup,
+        .o_add_conn             = client_import_add_conn,
+        .o_del_conn             = client_import_del_conn,
         .o_connect              = osc_connect,
         .o_disconnect           = osc_disconnect,
         .o_statfs               = osc_statfs,
@@ -3043,6 +3115,8 @@ struct obd_ops sanosc_obd_ops = {
         .o_attach               = osc_attach,
         .o_detach               = osc_detach,
         .o_cleanup              = client_obd_cleanup,
+        .o_add_conn             = client_import_add_conn,
+        .o_del_conn             = client_import_del_conn,
         .o_connect              = osc_connect,
         .o_disconnect           = client_disconnect_export,
         .o_statfs               = osc_statfs,