Whamcloud - gitweb
LU-3302 llog: Do not use ostid swab for llogid
[fs/lustre-release.git] / lustre / osp / osp_sync.c
index a6c3de0..fa90527 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -230,12 +230,11 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                osi->osi_unlink.lur_count = count;
                break;
        case MDS_SETATTR64_REC:
-               rc = fid_ostid_pack(fid, &osi->osi_oi);
+               rc = fid_to_ostid(fid, &osi->osi_oi);
                LASSERT(rc == 0);
                osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
                osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
-               osi->osi_setattr.lsr_oid  = osi->osi_oi.oi_id;
-               osi->osi_setattr.lsr_oseq = osi->osi_oi.oi_seq;
+               osi->osi_setattr.lsr_oi  = osi->osi_oi;
                LASSERT(attr);
                osi->osi_setattr.lsr_uid = attr->la_uid;
                osi->osi_setattr.lsr_gid = attr->la_gid;
@@ -257,10 +256,8 @@ static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
                      NULL, th);
        llog_ctxt_put(ctxt);
 
-       CDEBUG(D_OTHER, "%s: new record %lu:%lu:%lu/%lu: %d\n",
-              d->opd_obd->obd_name,
-              (unsigned long) osi->osi_cookie.lgc_lgl.lgl_oid,
-              (unsigned long) osi->osi_cookie.lgc_lgl.lgl_oseq,
+       CDEBUG(D_OTHER, "%s: new record "DOSTID":%lu/%lu: %d\n",
+              d->opd_obd->obd_name, POSTID(&osi->osi_cookie.lgc_lgl.lgl_oi),
               (unsigned long) osi->osi_cookie.lgc_lgl.lgl_ogen,
               (unsigned long) osi->osi_cookie.lgc_index, rc);
 
@@ -316,25 +313,15 @@ int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
 {
        struct osp_device *d = req->rq_cb_data;
-       struct obd_import *imp = req->rq_import;
 
        CDEBUG(D_HA, "commit req %p, transno "LPU64"\n", req, req->rq_transno);
 
        if (unlikely(req->rq_transno == 0))
                return;
 
-       if (unlikely(req->rq_transno > imp->imp_peer_committed_transno)) {
-               /* this request was aborted by the shutdown procedure,
-                * not committed by the peer.  we should preserve llog
-                * record */
-               spin_lock(&d->opd_syn_lock);
-               d->opd_syn_rpc_in_progress--;
-               spin_unlock(&d->opd_syn_lock);
-               cfs_waitq_signal(&d->opd_syn_waitq);
-               return;
-       }
+       /* do not do any opd_dyn_rpc_* accounting here
+        * it's done in osp_sync_interpret sooner or later */
 
-       /* XXX: what if request isn't committed for very long? */
        LASSERT(d);
        LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
        LASSERT(cfs_list_empty(&req->rq_exp_list));
@@ -354,7 +341,6 @@ static int osp_sync_interpret(const struct lu_env *env,
 {
        struct osp_device *d = req->rq_cb_data;
 
-       /* XXX: error handling here */
        if (req->rq_svc_thread != (void *) OSP_JOB_MAGIC)
                DEBUG_REQ(D_ERROR, req, "bad magic %p\n", req->rq_svc_thread);
        LASSERT(req->rq_svc_thread == (void *) OSP_JOB_MAGIC);
@@ -390,11 +376,11 @@ static int osp_sync_interpret(const struct lu_env *env,
                         "transno "LPU64", rc %d, gen: req %d, imp %d\n",
                         req->rq_transno, rc, req->rq_import_generation,
                         imp->imp_generation);
-               LASSERT(d->opd_syn_rpc_in_progress > 0);
                if (req->rq_transno == 0) {
                        /* this is the last time we see the request
                         * if transno is not zero, then commit cb
                         * will be called at some point */
+                       LASSERT(d->opd_syn_rpc_in_progress > 0);
                        spin_lock(&d->opd_syn_lock);
                        d->opd_syn_rpc_in_progress--;
                        spin_unlock(&d->opd_syn_lock);
@@ -500,8 +486,7 @@ static int osp_sync_new_setattr_job(struct osp_device *d,
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body);
-       body->oa.o_id  = rec->lsr_oid;
-       body->oa.o_seq = rec->lsr_oseq;
+       body->oa.o_oi = rec->lsr_oi;
        body->oa.o_uid = rec->lsr_uid;
        body->oa.o_gid = rec->lsr_gid;
        body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
@@ -529,8 +514,8 @@ static int osp_sync_new_unlink_job(struct osp_device *d,
 
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        LASSERT(body);
-       body->oa.o_id  = rec->lur_oid;
-       body->oa.o_seq = rec->lur_oseq;
+       ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
+       ostid_set_id(&body->oa.o_oi, rec->lur_oid);
        body->oa.o_misc = rec->lur_count;
        body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
        if (rec->lur_count)
@@ -559,7 +544,7 @@ static int osp_sync_new_unlink64_job(struct osp_device *d,
        body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
        if (body == NULL)
                RETURN(-EFAULT);
-       rc = fid_ostid_pack(&rec->lur_fid, &body->oa.o_oi);
+       rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
        if (rc < 0)
                RETURN(rc);
        body->oa.o_misc = rec->lur_count;
@@ -738,6 +723,11 @@ static void osp_sync_process_committed(const struct lu_env *env,
 
        osp_sync_check_for_work(d);
 
+       /* wake up the thread if requested to stop:
+        * it might be waiting for in-progress to complete */
+       if (unlikely(osp_sync_running(d) == 0))
+               cfs_waitq_signal(&d->opd_syn_waitq);
+
        EXIT;
 }
 
@@ -832,7 +822,7 @@ static int osp_sync_thread(void *_arg)
        struct obd_device       *obd = d->opd_obd;
        struct llog_handle      *llh;
        struct lu_env            env;
-       int                      rc;
+       int                      rc, count;
        char                     pname[16];
 
        ENTRY;
@@ -844,7 +834,7 @@ static int osp_sync_thread(void *_arg)
                RETURN(rc);
        }
 
-       sprintf(pname, "osp-syn-%u\n", d->opd_index);
+       sprintf(pname, "osp-syn-%u", d->opd_index);
        cfs_daemonize(pname);
 
        spin_lock(&d->opd_syn_lock);
@@ -878,11 +868,21 @@ static int osp_sync_thread(void *_arg)
                 d->opd_syn_rpc_in_flight);
 
        /* wait till all the requests are completed */
+       count = 0;
        while (d->opd_syn_rpc_in_progress > 0) {
                osp_sync_process_committed(&env, d);
-               l_wait_event(d->opd_syn_waitq,
-                            d->opd_syn_rpc_in_progress == 0,
-                            &lwi);
+
+               lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
+               rc = l_wait_event(d->opd_syn_waitq,
+                                 d->opd_syn_rpc_in_progress == 0,
+                                 &lwi);
+               if (rc == -ETIMEDOUT)
+                       count++;
+               LASSERTF(count < 10, "%s: %d %d %sempty\n",
+                        d->opd_obd->obd_name, d->opd_syn_rpc_in_progress,
+                        d->opd_syn_rpc_in_flight,
+                        cfs_list_empty(&d->opd_syn_committed_there) ? "" :"!");
+
        }
 
        llog_cat_close(&env, llh);
@@ -904,12 +904,10 @@ out:
        RETURN(0);
 }
 
-static struct llog_operations osp_mds_ost_orig_logops;
-
 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
 {
        struct osp_thread_info *osi = osp_env_info(env);
-       struct llog_handle     *lgh;
+       struct llog_handle     *lgh = NULL;
        struct obd_device      *obd = d->opd_obd;
        struct llog_ctxt       *ctxt;
        int                     rc;
@@ -932,12 +930,11 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
                RETURN(rc);
        }
 
-       CDEBUG(D_INFO, "%s: Init llog for %d - catid "LPX64"/"LPX64":%x\n",
-              obd->obd_name, d->opd_index, osi->osi_cid.lci_logid.lgl_oid,
-              osi->osi_cid.lci_logid.lgl_oseq,
+       CDEBUG(D_INFO, "%s: Init llog for %d - catid "DOSTID":%x\n",
+              obd->obd_name, d->opd_index,
+              POSTID(&osi->osi_cid.lci_logid.lgl_oi),
               osi->osi_cid.lci_logid.lgl_ogen);
 
-       osp_mds_ost_orig_logops = llog_osd_ops;
        rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT, obd,
                        &osp_mds_ost_orig_logops);
        if (rc)
@@ -946,26 +943,25 @@ static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
        ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
        LASSERT(ctxt);
 
-       if (likely(osi->osi_cid.lci_logid.lgl_oid != 0)) {
+       if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
                rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
                               LLOG_OPEN_EXISTS);
                /* re-create llog if it is missing */
                if (rc == -ENOENT)
-                       osi->osi_cid.lci_logid.lgl_oid = 0;
+                       logid_set_id(&osi->osi_cid.lci_logid, 0);
                else if (rc < 0)
                        GOTO(out_cleanup, rc);
        }
 
-       if (unlikely(osi->osi_cid.lci_logid.lgl_oid == 0)) {
+       if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
                rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
                if (rc < 0)
                        GOTO(out_cleanup, rc);
                osi->osi_cid.lci_logid = lgh->lgh_id;
        }
 
+       LASSERT(lgh != NULL);
        ctxt->loc_handle = lgh;
-       lgh->lgh_logops->lop_add = llog_cat_add_rec;
-       lgh->lgh_logops->lop_declare_add = llog_cat_declare_add_rec;
 
        rc = llog_cat_init_and_process(env, lgh);
        if (rc)