Whamcloud - gitweb
LU-1214 ptlrpc: splits server-side connection/bulkIO/recovery
[fs/lustre-release.git] / lustre / ptlrpc / target.c
index a1961b0..6f8bccf 100644 (file)
@@ -28,6 +28,8 @@
 /*
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
+ *
+ * Copyright (c) 2011, 2012, Whamcloud, Inc.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -41,6 +43,7 @@
 
 #include <obd.h>
 #include <lustre_fsfilt.h>
+#include <obd_class.h>
 
 /**
  * Update client data in last_rcvd file. An obd API
@@ -122,9 +125,9 @@ static void obt_boot_epoch_update(struct lu_target *lut)
         cfs_spin_unlock(&lut->lut_translock);
 
         CFS_INIT_LIST_HEAD(&client_list);
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         cfs_list_splice_init(&obd->obd_final_req_queue, &client_list);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
 
         /**
          * go through list of exports participated in recovery and
@@ -135,9 +138,9 @@ static void obt_boot_epoch_update(struct lu_target *lut)
                 obt_client_epoch_update(req->rq_export);
         }
         /** return list back at once */
-        cfs_spin_lock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_lock(&obd->obd_recovery_task_lock);
         cfs_list_splice_init(&client_list, &obd->obd_final_req_queue);
-        cfs_spin_unlock_bh(&obd->obd_processing_task_lock);
+        cfs_spin_unlock(&obd->obd_recovery_task_lock);
         obt_server_data_update(lut, 1);
 }
 
@@ -148,19 +151,25 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
                                const struct lu_buf *buf, loff_t *off, int sync)
 {
         struct thandle *th;
-        struct txn_param p;
-        int rc, credits;
+        int rc;
         ENTRY;
 
-        credits = lut->lut_bottom->dd_ops->dt_credit_get(env, lut->lut_bottom,
-                                                         DTO_WRITE_BLOCK);
-        txn_param_init(&p, credits);
-
-        th = dt_trans_start(env, lut->lut_bottom, &p);
+        th = dt_trans_create(env, lut->lut_bottom);
         if (IS_ERR(th))
                 RETURN(PTR_ERR(th));
 
+        rc = dt_declare_record_write(env, lut->lut_last_rcvd,
+                                     buf->lb_len, *off, th);
+        if (rc)
+                goto stop;
+
+        rc = dt_trans_start(env, lut->lut_bottom, th);
+        if (rc)
+                goto stop;
+
         rc = dt_record_write(env, lut->lut_last_rcvd, buf, off, th);
+
+stop:
         dt_trans_stop(env, lut->lut_bottom, th);
 
         CDEBUG(D_INFO, "write last_rcvd header rc = %d:\n"
@@ -175,6 +184,8 @@ static int lut_last_rcvd_write(const struct lu_env *env, struct lu_target *lut,
  */
 int lut_client_alloc(struct obd_export *exp)
 {
+        LASSERT(exp != exp->exp_obd->obd_self_export);
+
         OBD_ALLOC_PTR(exp->exp_target_data.ted_lcd);
         if (exp->exp_target_data.ted_lcd == NULL)
                 RETURN(-ENOMEM);
@@ -192,6 +203,8 @@ void lut_client_free(struct obd_export *exp)
         struct tg_export_data *ted = &exp->exp_target_data;
         struct lu_target *lut = class_exp2tgt(exp);
 
+        LASSERT(exp != exp->exp_obd->obd_self_export);
+
         OBD_FREE_PTR(ted->ted_lcd);
         ted->ted_lcd = NULL;
 
@@ -234,8 +247,8 @@ int lut_client_data_update(const struct lu_env *env, struct obd_export *exp)
 /**
  * Update server data in last_rcvd
  */
-static int lut_server_data_update(const struct lu_env *env,
-                                  struct lu_target *lut, int sync)
+int lut_server_data_update(const struct lu_env *env,
+                           struct lu_target *lut, int sync)
 {
         struct lr_server_data tmp_lsd;
         loff_t tmp_off = 0;
@@ -248,7 +261,7 @@ static int lut_server_data_update(const struct lu_env *env,
 
         CDEBUG(D_SUPER,
                "%s: mount_count is "LPU64", last_transno is "LPU64"\n",
-               lut->lut_lsd.lsd_uuid, lut->lut_mount_count,
+               lut->lut_lsd.lsd_uuid, lut->lut_obd->u.obt.obt_mount_count,
                lut->lut_last_transno);
 
         cfs_spin_lock(&lut->lut_translock);
@@ -293,7 +306,7 @@ void lut_boot_epoch_update(struct lu_target *lut)
 
         rc = lu_env_init(&env, LCT_DT_THREAD);
         if (rc) {
-                CERROR("Can't initialize environment rc=%i\n", rc);
+                CERROR("Can't initialize environment rc=%d\n", rc);
                 return;
         }
 
@@ -308,9 +321,9 @@ void lut_boot_epoch_update(struct lu_target *lut)
          * The recovery is not yet finished and final queue can still be updated
          * with resend requests. Move final list to separate one for processing
          */
-        cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+        cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
         cfs_list_splice_init(&lut->lut_obd->obd_final_req_queue, &client_list);
-        cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+        cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
 
         /**
          * go through list of exports participated in recovery and
@@ -322,9 +335,9 @@ void lut_boot_epoch_update(struct lu_target *lut)
                         lut_client_epoch_update(&env, req->rq_export);
         }
         /** return list back at once */
-        cfs_spin_lock_bh(&lut->lut_obd->obd_processing_task_lock);
+        cfs_spin_lock(&lut->lut_obd->obd_recovery_task_lock);
         cfs_list_splice_init(&client_list, &lut->lut_obd->obd_final_req_queue);
-        cfs_spin_unlock_bh(&lut->lut_obd->obd_processing_task_lock);
+        cfs_spin_unlock(&lut->lut_obd->obd_recovery_task_lock);
         /** update server epoch */
         lut_server_data_update(&env, lut, 1);
         lu_env_fini(&env);
@@ -334,37 +347,115 @@ EXPORT_SYMBOL(lut_boot_epoch_update);
 /**
  * commit callback, need to update last_commited value
  */
-void lut_cb_last_committed(struct lu_target *lut, __u64 transno,
-                           void *data, int err)
+struct lut_last_committed_callback {
+        struct dt_txn_commit_cb llcc_cb;
+        struct lu_target       *llcc_lut;
+        struct obd_export      *llcc_exp;
+        __u64                   llcc_transno;
+};
+
+void lut_cb_last_committed(struct lu_env *env, struct thandle *th,
+                           struct dt_txn_commit_cb *cb, int err)
 {
-        struct obd_export *exp = data;
-        LASSERT(exp->exp_obd == lut->lut_obd);
-        cfs_spin_lock(&lut->lut_translock);
-        if (transno > lut->lut_obd->obd_last_committed)
-                lut->lut_obd->obd_last_committed = transno;
-
-        LASSERT(exp);
-        if (transno > exp->exp_last_committed) {
-                exp->exp_last_committed = transno;
-                cfs_spin_unlock(&lut->lut_translock);
-                ptlrpc_commit_replies(exp);
+        struct lut_last_committed_callback *ccb;
+
+        ccb = container_of0(cb, struct lut_last_committed_callback, llcc_cb);
+
+        LASSERT(ccb->llcc_exp->exp_obd == ccb->llcc_lut->lut_obd);
+
+        cfs_spin_lock(&ccb->llcc_lut->lut_translock);
+        if (ccb->llcc_transno > ccb->llcc_lut->lut_obd->obd_last_committed)
+                ccb->llcc_lut->lut_obd->obd_last_committed = ccb->llcc_transno;
+
+        LASSERT(ccb->llcc_exp);
+        if (ccb->llcc_transno > ccb->llcc_exp->exp_last_committed) {
+                ccb->llcc_exp->exp_last_committed = ccb->llcc_transno;
+                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
+                ptlrpc_commit_replies(ccb->llcc_exp);
         } else {
-                cfs_spin_unlock(&lut->lut_translock);
+                cfs_spin_unlock(&ccb->llcc_lut->lut_translock);
         }
-        class_export_cb_put(exp);
-        if (transno)
+        class_export_cb_put(ccb->llcc_exp);
+        if (ccb->llcc_transno)
                 CDEBUG(D_HA, "%s: transno "LPD64" is committed\n",
-                       lut->lut_obd->obd_name, transno);
+                       ccb->llcc_lut->lut_obd->obd_name, ccb->llcc_transno);
+        cfs_list_del(&ccb->llcc_cb.dcb_linkage);
+        OBD_FREE_PTR(ccb);
 }
-EXPORT_SYMBOL(lut_cb_last_committed);
 
-void lut_cb_client(struct lu_target *lut, __u64 transno,
-                       void *data, int err)
+int lut_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
+                           struct obd_export *exp, __u64 transno)
 {
-        LASSERT(lut->lut_obd);
-        target_client_add_cb(lut->lut_obd, transno, data, err);
+        struct lut_last_committed_callback *ccb;
+        int rc;
+
+        OBD_ALLOC_PTR(ccb);
+        if (ccb == NULL)
+                return -ENOMEM;
+
+        ccb->llcc_cb.dcb_func = lut_cb_last_committed;
+        CFS_INIT_LIST_HEAD(&ccb->llcc_cb.dcb_linkage);
+        ccb->llcc_lut = lut;
+        ccb->llcc_exp = class_export_cb_get(exp);
+        ccb->llcc_transno = transno;
+
+        rc = dt_trans_cb_add(th, &ccb->llcc_cb);
+        if (rc) {
+                class_export_cb_put(exp);
+                OBD_FREE_PTR(ccb);
+        }
+        return rc;
+}
+EXPORT_SYMBOL(lut_last_commit_cb_add);
+
+struct lut_new_client_callback {
+        struct dt_txn_commit_cb lncc_cb;
+        struct obd_export      *lncc_exp;
+};
+
+void lut_cb_new_client(struct lu_env *env, struct thandle *th,
+                       struct dt_txn_commit_cb *cb, int err)
+{
+        struct lut_new_client_callback *ccb;
+
+        ccb = container_of0(cb, struct lut_new_client_callback, lncc_cb);
+
+        LASSERT(ccb->lncc_exp->exp_obd);
+
+        CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+               ccb->lncc_exp->exp_obd->obd_name,
+               ccb->lncc_exp->exp_client_uuid.uuid);
+
+        cfs_spin_lock(&ccb->lncc_exp->exp_lock);
+        ccb->lncc_exp->exp_need_sync = 0;
+        cfs_spin_unlock(&ccb->lncc_exp->exp_lock);
+        class_export_cb_put(ccb->lncc_exp);
+
+        cfs_list_del(&ccb->lncc_cb.dcb_linkage);
+        OBD_FREE_PTR(ccb);
+}
+
+int lut_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+        struct lut_new_client_callback *ccb;
+        int rc;
+
+        OBD_ALLOC_PTR(ccb);
+        if (ccb == NULL)
+                return -ENOMEM;
+
+        ccb->lncc_cb.dcb_func = lut_cb_new_client;
+        CFS_INIT_LIST_HEAD(&ccb->lncc_cb.dcb_linkage);
+        ccb->lncc_exp = class_export_cb_get(exp);
+
+        rc = dt_trans_cb_add(th, &ccb->lncc_cb);
+        if (rc) {
+                class_export_cb_put(exp);
+                OBD_FREE_PTR(ccb);
+        }
+        return rc;
 }
-EXPORT_SYMBOL(lut_cb_client);
+EXPORT_SYMBOL(lut_new_client_cb_add);
 
 int lut_init(const struct lu_env *env, struct lu_target *lut,
              struct obd_device *obd, struct dt_device *dt)