Whamcloud - gitweb
Land b_smallfix onto HEAD (20040213_1402)
[fs/lustre-release.git] / lustre / ldlm / ldlm_lib.c
index dd7abc8..9d4934e 100644 (file)
@@ -91,17 +91,20 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
                 RETURN(-EINVAL);
         }
 
+
         sema_init(&cli->cl_sem, 1);
         cli->cl_conn_count = 0;
         memcpy(server_uuid.uuid, lcfg->lcfg_inlbuf2, MIN(lcfg->lcfg_inllen2,
                                                         sizeof(server_uuid)));
 
+        init_MUTEX(&cli->cl_dirty_sem);
         cli->cl_dirty = 0;
         cli->cl_dirty_granted = 0;
-        cli->cl_dirty_max = 64*1024*1024; /* some default */
+        cli->cl_dirty_max = OSC_MAX_DIRTY_DEFAULT * 1024 * 1024;
         cli->cl_ost_can_grant = 1;
         INIT_LIST_HEAD(&cli->cl_cache_waiters);
         INIT_LIST_HEAD(&cli->cl_loi_ready_list);
+        INIT_LIST_HEAD(&cli->cl_loi_write_list);
         spin_lock_init(&cli->cl_loi_list_lock);
         cli->cl_brw_in_flight = 0;
         spin_lock_init(&cli->cl_read_rpc_hist.oh_lock);
@@ -109,7 +112,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         spin_lock_init(&cli->cl_read_page_hist.oh_lock);
         spin_lock_init(&cli->cl_write_page_hist.oh_lock);
         cli->cl_max_pages_per_rpc = PTL_MD_MAX_PAGES;
-        cli->cl_max_rpcs_in_flight = 8;
+        cli->cl_max_rpcs_in_flight = OSC_MAX_RIF_DEFAULT;
 
         ldlm_get_ref();
         if (rc) {
@@ -134,6 +137,7 @@ int client_obd_setup(struct obd_device *obddev, obd_count len, void *buf)
         imp->imp_obd = obddev;
         imp->imp_connect_op = connect_op;
         imp->imp_generation = 0;
+        imp->imp_initial_recov = 1;
         INIT_LIST_HEAD(&imp->imp_pinger_chain);
         memcpy(imp->imp_target_uuid.uuid, lcfg->lcfg_inlbuf1,
               lcfg->lcfg_inllen1);
@@ -243,27 +247,17 @@ int client_connect_import(struct lustre_handle *dlm_handle,
                 GOTO(out_disco, rc = -ENOMEM);
 
         imp->imp_dlm_handle = *dlm_handle;
-        imp->imp_state = LUSTRE_IMP_DISCON;
+        rc = ptlrpc_init_import(imp);
+        if (rc != 0) 
+                GOTO(out_ldlm, rc);
 
-        rc = ptlrpc_connect_import(imp);
+        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
+        rc = ptlrpc_connect_import(imp, NULL);
         if (rc != 0) {
                 LASSERT (imp->imp_state == LUSTRE_IMP_DISCON);
                 GOTO(out_ldlm, rc);
         }
 
-        LASSERT (imp->imp_state == LUSTRE_IMP_FULL);
-
-        exp->exp_connection = ptlrpc_connection_addref(imp->imp_connection);
-
-        if (imp->imp_replayable) {
-                CDEBUG(D_HA, "connected to replayable target: %s\n",
-                       imp->imp_target_uuid.uuid);
-                ptlrpc_pinger_add_import(imp);
-        }
-
-        CDEBUG(D_HA, "local import: %p, remote handle: "LPX64"\n", imp,
-               imp->imp_remote_handle.cookie);
-
         EXIT;
 
         if (rc) {
@@ -325,8 +319,6 @@ int client_disconnect_export(struct obd_export *exp, int failover)
         else
                 rc = ptlrpc_disconnect_import(imp);
 
-        imp->imp_state = LUSTRE_IMP_NEW;
-
         EXIT;
  out_no_disconnect:
         err = class_disconnect(exp, 0);
@@ -507,13 +499,10 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         export = req->rq_export = class_conn2export(&conn);
         LASSERT(export != NULL);
 
-        if (req->rq_connection != NULL)
-                ptlrpc_put_connection(req->rq_connection);
         if (export->exp_connection != NULL)
                 ptlrpc_put_connection(export->exp_connection);
         export->exp_connection = ptlrpc_get_connection(&req->rq_peer,
                                                        &remote_uuid);
-        req->rq_connection = ptlrpc_connection_addref(export->exp_connection);
 
         LASSERT(export->exp_conn_cnt < req->rq_reqmsg->conn_cnt);
         export->exp_conn_cnt = req->rq_reqmsg->conn_cnt;
@@ -530,7 +519,7 @@ int target_handle_connect(struct ptlrpc_request *req, svc_handler_t handler)
         if (export->exp_imp_reverse != NULL)
                 class_destroy_import(export->exp_imp_reverse);
         revimp = export->exp_imp_reverse = class_new_import();
-        revimp->imp_connection = ptlrpc_connection_addref(req->rq_connection);
+        revimp->imp_connection = ptlrpc_connection_addref(export->exp_connection);
         revimp->imp_client = &export->exp_obd->obd_ldlm_client;
         revimp->imp_remote_handle = conn;
         revimp->imp_obd = target;
@@ -545,6 +534,7 @@ out:
 
 int target_handle_disconnect(struct ptlrpc_request *req)
 {
+        struct obd_export *exp;
         int rc;
         ENTRY;
 
@@ -552,8 +542,9 @@ int target_handle_disconnect(struct ptlrpc_request *req)
         if (rc)
                 RETURN(rc);
 
-        req->rq_status = obd_disconnect(req->rq_export, 0);
-        req->rq_export = NULL;
+        /* keep the rq_export around so we can send the reply */
+        exp = class_export_get(req->rq_export);
+        req->rq_status = obd_disconnect(exp, 0);
         RETURN(0);
 }
 
@@ -566,18 +557,14 @@ void target_destroy_export(struct obd_export *exp)
 
         /* We cancel locks at disconnect time, but this will catch any locks
          * granted in a race with recovery-induced disconnect. */
-        ldlm_cancel_locks_for_export(exp);
+        if (exp->exp_obd->obd_namespace != NULL)
+                ldlm_cancel_locks_for_export(exp);
 }
 
 /*
  * Recovery functions
  */
 
-void target_cancel_recovery_timer(struct obd_device *obd)
-{
-        del_timer(&obd->obd_recovery_timer);
-}
-
 static void abort_delayed_replies(struct obd_device *obd)
 {
         struct ptlrpc_request *req;
@@ -588,6 +575,7 @@ static void abort_delayed_replies(struct obd_device *obd)
                 req->rq_status = -ENOTCONN;
                 req->rq_type = PTL_RPC_MSG_ERR;
                 ptlrpc_reply(req);
+                class_export_put(req->rq_export);
                 list_del(&req->rq_list);
                 OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
                 OBD_FREE(req, sizeof *req);
@@ -640,7 +628,7 @@ void target_abort_recovery(void *data)
 
         class_disconnect_exports(obd, 0);
 
-        /* when recovery was abort, cleanup orphans for mds */
+        /* when recovery was aborted, cleanup orphans on mds and ost */
         if (OBT(obd) && OBP(obd, postrecov)) {
                 rc = OBP(obd, postrecov)(obd);
                 if (rc >= 0)
@@ -664,18 +652,25 @@ static void target_recovery_expired(unsigned long castmeharder)
         spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
-static void reset_recovery_timer(struct obd_device *obd)
+
+/* obd_processing_task_lock should be held */
+void target_cancel_recovery_timer(struct obd_device *obd)
 {
-        int recovering;
-        spin_lock(&obd->obd_dev_lock);
-        recovering = obd->obd_recovering;
-        spin_unlock(&obd->obd_dev_lock);
+        CDEBUG(D_HA, "%s: cancel recovery timer\n", obd->obd_name);
+        del_timer(&obd->obd_recovery_timer);
+}
 
-        if (!recovering)
+static void reset_recovery_timer(struct obd_device *obd)
+{
+        spin_lock_bh(&obd->obd_processing_task_lock);
+        if (!obd->obd_recovering) {
+                spin_unlock_bh(&obd->obd_processing_task_lock);
                 return;
+        }                
         CDEBUG(D_HA, "timer will expire in %u seconds\n",
                OBD_RECOVERY_TIMEOUT / HZ);
         mod_timer(&obd->obd_recovery_timer, jiffies + OBD_RECOVERY_TIMEOUT);
+        spin_unlock_bh(&obd->obd_processing_task_lock);
 }
 
 
@@ -914,6 +909,7 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
         memcpy(reqmsg, req->rq_reqmsg, req->rq_reqlen);
         req = saved_req;
         req->rq_reqmsg = reqmsg;
+        class_export_get(req->rq_export);
         list_add(&req->rq_list, &obd->obd_delayed_reply_queue);
 
         spin_lock_bh(&obd->obd_processing_task_lock);
@@ -926,14 +922,17 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                 ldlm_reprocess_all_ns(req->rq_export->exp_obd->obd_namespace);
                 CWARN("%s: all clients recovered, sending delayed replies\n",
                        obd->obd_name);
+                spin_lock_bh(&obd->obd_processing_task_lock);
                 obd->obd_recovering = 0;
+                target_cancel_recovery_timer(obd);
+                spin_unlock_bh(&obd->obd_processing_task_lock);
 
-                /* when recovering finished, cleanup orphans for mds */
+                /* when recovery finished, cleanup orphans on mds and ost */
                 if (OBT(obd) && OBP(obd, postrecov)) {
                         rc2 = OBP(obd, postrecov)(obd);
                         if (rc2 >= 0)
-                                CWARN("%s: all clients recovered, %d MDS orphans "
-                                       "deleted\n", obd->obd_name, rc2);
+                                CWARN("%s: all clients recovered, %d MDS "
+                                      "orphans deleted\n", obd->obd_name, rc2);
                         else
                                 CERROR("postrecov failed %d\n", rc2);
                 }
@@ -942,13 +941,14 @@ int target_queue_final_reply(struct ptlrpc_request *req, int rc)
                         req = list_entry(tmp, struct ptlrpc_request, rq_list);
                         DEBUG_REQ(D_ERROR, req, "delayed:");
                         ptlrpc_reply(req);
+                        class_export_put(req->rq_export);
                         list_del(&req->rq_list);
                         OBD_FREE(req->rq_reqmsg, req->rq_reqlen);
                         OBD_FREE(req, sizeof *req);
                 }
-                target_cancel_recovery_timer(obd);
+                ptlrpc_run_recovery_over_upcall(obd);
         } else {
-                CERROR("%s: %d recoverable clients remain\n",
+                CWARN("%s: %d recoverable clients remain\n",
                        obd->obd_name, obd->obd_recoverable_clients);
                 wake_up(&obd->obd_next_transno_waitq);
         }
@@ -1123,6 +1123,8 @@ void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
         for (i = 0; i < REQ_MAX_ACK_LOCKS; i++) {
                 if (req->rq_ack_locks[i].mode)
                         continue;
+                CDEBUG(D_HA, "saving lock "LPX64" in req %p ack_lock[%d]\n",
+                       lock->cookie, req, i);
                 memcpy(&req->rq_ack_locks[i].lock, lock, sizeof(*lock));
                 req->rq_ack_locks[i].mode = mode;
                 return &req->rq_ack_locks[i];
@@ -1131,4 +1133,3 @@ void *ldlm_put_lock_into_req(struct ptlrpc_request *req,
         LBUG();
         return NULL;
 }
-