Whamcloud - gitweb
LU-2349 osp: Move log ops init to module init.
[fs/lustre-release.git] / lustre / osp / osp_sync.c
index 2f9cbd2..0616e87 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -268,9 +268,9 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                rc = 0;
 
        if (likely(rc == 0)) {
-               cfs_spin_lock(&d->opd_syn_lock);
+               spin_lock(&d->opd_syn_lock);
                d->opd_syn_changes++;
-               cfs_spin_unlock(&d->opd_syn_lock);
+               spin_unlock(&d->opd_syn_lock);
        }
 
        RETURN(rc);
@@ -316,12 +316,24 @@ int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
 {
        struct osp_device *d = req->rq_cb_data;
+       struct obd_import *imp = req->rq_import;
 
        CDEBUG(D_HA, "commit req %p, transno "LPU64"\n", req, req->rq_transno);
 
        if (unlikely(req->rq_transno == 0))
                return;
 
+       if (unlikely(req->rq_transno > imp->imp_peer_committed_transno)) {
+               /* this request was aborted by the shutdown procedure,
+                * not committed by the peer.  we should preserve llog
+                * record */
+               spin_lock(&d->opd_syn_lock);
+               d->opd_syn_rpc_in_progress--;
+               spin_unlock(&d->opd_syn_lock);
+               cfs_waitq_signal(&d->opd_syn_waitq);
+               return;
+       }
+
        /* XXX: what if request isn't committed for very long? */
        LASSERT(d);
        LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
@@ -329,9 +341,9 @@ static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
 
        ptlrpc_request_addref(req);
 
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        cfs_list_add(&req->rq_exp_list, &d->opd_syn_committed_there);
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
 
        /* XXX: some batching wouldn't hurt */
        cfs_waitq_signal(&d->opd_syn_waitq);
@@ -363,9 +375,9 @@ static int osp_sync_interpret(const struct lu_env *env,
 
                ptlrpc_request_addref(req);
 
-               cfs_spin_lock(&d->opd_syn_lock);
+               spin_lock(&d->opd_syn_lock);
                cfs_list_add(&req->rq_exp_list, &d->opd_syn_committed_there);
-               cfs_spin_unlock(&d->opd_syn_lock);
+               spin_unlock(&d->opd_syn_lock);
 
                cfs_waitq_signal(&d->opd_syn_waitq);
        } else if (rc) {
@@ -383,9 +395,9 @@ static int osp_sync_interpret(const struct lu_env *env,
                        /* this is the last time we see the request
                         * if transno is not zero, then commit cb
                         * will be called at some point */
-                       cfs_spin_lock(&d->opd_syn_lock);
+                       spin_lock(&d->opd_syn_lock);
                        d->opd_syn_rpc_in_progress--;
-                       cfs_spin_unlock(&d->opd_syn_lock);
+                       spin_unlock(&d->opd_syn_lock);
                }
 
                cfs_waitq_signal(&d->opd_syn_waitq);
@@ -399,9 +411,9 @@ static int osp_sync_interpret(const struct lu_env *env,
        }
 
        LASSERT(d->opd_syn_rpc_in_flight > 0);
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        d->opd_syn_rpc_in_flight--;
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
               d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
               d->opd_syn_rpc_in_progress);
@@ -594,10 +606,10 @@ static int osp_sync_process_record(const struct lu_env *env,
 
        /* notice we increment counters before sending RPC, to be consistent
         * in RPC interpret callback which may happen very quickly */
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        d->opd_syn_rpc_in_flight++;
        d->opd_syn_rpc_in_progress++;
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
 
        switch (rec->lrh_type) {
        /* case MDS_UNLINK_REC is kept for compatibility */
@@ -617,7 +629,7 @@ static int osp_sync_process_record(const struct lu_env *env,
        }
 
        if (likely(rc == 0)) {
-               cfs_spin_lock(&d->opd_syn_lock);
+               spin_lock(&d->opd_syn_lock);
                if (d->opd_syn_prev_done) {
                        LASSERT(d->opd_syn_changes > 0);
                        LASSERT(rec->lrh_id <= d->opd_syn_last_committed_id);
@@ -633,12 +645,12 @@ static int osp_sync_process_record(const struct lu_env *env,
                CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
                       d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
                       d->opd_syn_rpc_in_progress);
-               cfs_spin_unlock(&d->opd_syn_lock);
+               spin_unlock(&d->opd_syn_lock);
        } else {
-               cfs_spin_lock(&d->opd_syn_lock);
+               spin_lock(&d->opd_syn_lock);
                d->opd_syn_rpc_in_flight--;
                d->opd_syn_rpc_in_progress--;
-               cfs_spin_unlock(&d->opd_syn_lock);
+               spin_unlock(&d->opd_syn_lock);
        }
 
        CDEBUG(D_HA, "found record %x, %d, idx %u, id %u: %d\n",
@@ -686,10 +698,10 @@ static void osp_sync_process_committed(const struct lu_env *env,
        LASSERT(llh);
 
        CFS_INIT_LIST_HEAD(&list);
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        cfs_list_splice(&d->opd_syn_committed_there, &list);
        CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
 
        cfs_list_for_each_entry_safe(req, tmp, &list, rq_exp_list) {
                LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
@@ -717,9 +729,9 @@ static void osp_sync_process_committed(const struct lu_env *env,
        llog_ctxt_put(ctxt);
 
        LASSERT(d->opd_syn_rpc_in_progress >= done);
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        d->opd_syn_rpc_in_progress -= done;
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
        CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
               d->opd_obd->obd_name, d->opd_syn_rpc_in_flight,
               d->opd_syn_rpc_in_progress);
@@ -835,9 +847,9 @@ static int osp_sync_thread(void *_arg)
        sprintf(pname, "osp-syn-%u\n", d->opd_index);
        cfs_daemonize(pname);
 
-       cfs_spin_lock(&d->opd_syn_lock);
+       spin_lock(&d->opd_syn_lock);
        thread->t_flags = SVC_RUNNING;
-       cfs_spin_unlock(&d->opd_syn_lock);
+       spin_unlock(&d->opd_syn_lock);
        cfs_waitq_signal(&thread->t_ctl_waitq);
 
        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
@@ -865,7 +877,13 @@ static int osp_sync_thread(void *_arg)
                 d->opd_syn_changes, d->opd_syn_rpc_in_progress,
                 d->opd_syn_rpc_in_flight);
 
-       osp_sync_process_committed(&env, d);
+       /* wait till all the requests are completed */
+       while (d->opd_syn_rpc_in_progress > 0) {
+               osp_sync_process_committed(&env, d);
+               l_wait_event(d->opd_syn_waitq,
+                            d->opd_syn_rpc_in_progress == 0,
+                            &lwi);
+       }
 
        llog_cat_close(&env, llh);
        rc = llog_cleanup(&env, ctxt);
@@ -874,13 +892,6 @@ static int osp_sync_thread(void *_arg)
 out:
        thread->t_flags = SVC_STOPPED;
 
-       /*
-        * there might be a race between osp sync thread sending RPCs and
-        * import invalidation. this can result in RPCs being in ptlrpcd
-        * till this point. for safete reason let's wait till they are done
-        */
-       l_wait_event(d->opd_syn_waitq, d->opd_syn_rpc_in_flight == 0, &lwi);
-
        cfs_waitq_signal(&thread->t_ctl_waitq);
        LASSERTF(d->opd_syn_rpc_in_progress == 0,
                 "%s: %d %d %sempty\n",
@@ -893,8 +904,6 @@ out:
        RETURN(0);
 }
 
-static struct llog_operations osp_mds_ost_orig_logops;
-
 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
 {
        struct osp_thread_info *osi = osp_env_info(env);
@@ -926,7 +935,6 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
               osi->osi_cid.lci_logid.lgl_oseq,
               osi->osi_cid.lci_logid.lgl_ogen);
 
-       osp_mds_ost_orig_logops = llog_osd_ops;
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, obd,
                        &osp_mds_ost_orig_logops);
        if (rc)
@@ -953,8 +961,6 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
        }
 
        ctxt->loc_handle = lgh;
-       lgh->lgh_logops->lop_add = llog_cat_add_rec;
-       lgh->lgh_logops->lop_declare_add = llog_cat_declare_add_rec;
 
        rc = llog_cat_init_and_process(env, lgh);
        if (rc)
@@ -1029,7 +1035,7 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d)
         */
        d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
        d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
-       cfs_spin_lock_init(&d->opd_syn_lock);
+       spin_lock_init(&d->opd_syn_lock);
        cfs_waitq_init(&d->opd_syn_waitq);
        cfs_waitq_init(&d->opd_syn_thread.t_ctl_waitq);
        CFS_INIT_LIST_HEAD(&d->opd_syn_committed_there);
@@ -1071,7 +1077,7 @@ int osp_sync_fini(struct osp_device *d)
        RETURN(0);
 }
 
-static CFS_DEFINE_MUTEX(osp_id_tracker_sem);
+static DEFINE_MUTEX(osp_id_tracker_sem);
 static CFS_LIST_HEAD(osp_id_tracker_list);
 
 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
@@ -1086,7 +1092,7 @@ static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
        if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
                return;
 
-       cfs_spin_lock(&tr->otr_lock);
+       spin_lock(&tr->otr_lock);
        if (likely(txn->oti_current_id > tr->otr_committed_id)) {
                CDEBUG(D_OTHER, "committed: %u -> %u\n",
                       tr->otr_committed_id, txn->oti_current_id);
@@ -1098,7 +1104,7 @@ static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
                        cfs_waitq_signal(&d->opd_syn_waitq);
                }
        }
-       cfs_spin_unlock(&tr->otr_lock);
+       spin_unlock(&tr->otr_lock);
 }
 
 static int osp_sync_id_traction_init(struct osp_device *d)
@@ -1111,7 +1117,7 @@ static int osp_sync_id_traction_init(struct osp_device *d)
        LASSERT(d->opd_syn_tracker == NULL);
        CFS_INIT_LIST_HEAD(&d->opd_syn_ontrack);
 
-       cfs_mutex_lock(&osp_id_tracker_sem);
+       mutex_lock(&osp_id_tracker_sem);
        cfs_list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
                if (tr->otr_dev == d->opd_storage) {
                        LASSERT(cfs_atomic_read(&tr->otr_refcount));
@@ -1127,7 +1133,7 @@ static int osp_sync_id_traction_init(struct osp_device *d)
                OBD_ALLOC_PTR(tr);
                if (tr) {
                        d->opd_syn_tracker = tr;
-                       cfs_spin_lock_init(&tr->otr_lock);
+                       spin_lock_init(&tr->otr_lock);
                        tr->otr_dev = d->opd_storage;
                        tr->otr_next_id = 1;
                        tr->otr_committed_id = 0;
@@ -1142,7 +1148,7 @@ static int osp_sync_id_traction_init(struct osp_device *d)
                        rc = 0;
                }
        }
-       cfs_mutex_unlock(&osp_id_tracker_sem);
+       mutex_unlock(&osp_id_tracker_sem);
 
        return rc;
 }
@@ -1162,7 +1168,7 @@ static void osp_sync_id_traction_fini(struct osp_device *d)
 
        osp_sync_remove_from_tracker(d);
 
-       cfs_mutex_lock(&osp_id_tracker_sem);
+       mutex_lock(&osp_id_tracker_sem);
        if (cfs_atomic_dec_and_test(&tr->otr_refcount)) {
                dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
                LASSERT(cfs_list_empty(&tr->otr_wakeup_list));
@@ -1170,7 +1176,7 @@ static void osp_sync_id_traction_fini(struct osp_device *d)
                OBD_FREE_PTR(tr);
                d->opd_syn_tracker = NULL;
        }
-       cfs_mutex_unlock(&osp_id_tracker_sem);
+       mutex_unlock(&osp_id_tracker_sem);
 
        EXIT;
 }
@@ -1186,9 +1192,9 @@ static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
        LASSERT(tr);
 
        /* XXX: we can improve this introducing per-cpu preallocated ids? */
-       cfs_spin_lock(&tr->otr_lock);
+       spin_lock(&tr->otr_lock);
        if (unlikely(tr->otr_next_id <= d->opd_syn_last_used_id)) {
-               cfs_spin_unlock(&tr->otr_lock);
+               spin_unlock(&tr->otr_lock);
                CERROR("%s: next %u, last synced %lu\n",
                       d->opd_obd->obd_name, tr->otr_next_id,
                       d->opd_syn_last_used_id);
@@ -1201,7 +1207,7 @@ static __u32 osp_sync_id_get(struct osp_device *d, __u32 id)
                d->opd_syn_last_used_id = id;
        if (cfs_list_empty(&d->opd_syn_ontrack))
                cfs_list_add(&d->opd_syn_ontrack, &tr->otr_wakeup_list);
-       cfs_spin_unlock(&tr->otr_lock);
+       spin_unlock(&tr->otr_lock);
        CDEBUG(D_OTHER, "new id %u\n", (unsigned) id);
 
        return id;
@@ -1217,8 +1223,8 @@ static void osp_sync_remove_from_tracker(struct osp_device *d)
        if (cfs_list_empty(&d->opd_syn_ontrack))
                return;
 
-       cfs_spin_lock(&tr->otr_lock);
+       spin_lock(&tr->otr_lock);
        cfs_list_del_init(&d->opd_syn_ontrack);
-       cfs_spin_unlock(&tr->otr_lock);
+       spin_unlock(&tr->otr_lock);
 }