Whamcloud - gitweb
LU-243 async lov_sync() operation
[fs/lustre-release.git] / lustre / lov / lov_obd.c
index c13ccaf..61fdb7b 100644 (file)
@@ -121,7 +121,7 @@ static void lov_putref(struct obd_device *obd)
 }
 
 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
-                              int activate);
+                              enum obd_notify_event ev);
 static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                       enum obd_notify_event ev, void *data);
 
@@ -384,15 +384,15 @@ out:
  *  any >= 0 : is log target index
  */
 static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
-                              int activate)
+                              enum obd_notify_event ev)
 {
         struct lov_obd *lov = &obd->u.lov;
         struct lov_tgt_desc *tgt;
-        int index;
+        int index, activate, active;
         ENTRY;
 
-        CDEBUG(D_INFO, "Searching in lov %p for uuid %s (activate=%d)\n",
-               lov, uuid->uuid, activate);
+        CDEBUG(D_INFO, "Searching in lov %p for uuid %s event(%d)\n",
+               lov, uuid->uuid, ev);
 
         obd_getref(obd);
         for (index = 0; index < lov->desc.ld_tgt_count; index++) {
@@ -410,26 +410,43 @@ static int lov_set_osc_active(struct obd_device *obd, struct obd_uuid *uuid,
         if (index == lov->desc.ld_tgt_count)
                 GOTO(out, index = -EINVAL);
 
-        if (lov->lov_tgts[index]->ltd_active == activate) {
-                CDEBUG(D_INFO, "OSC %s already %sactive!\n", uuid->uuid,
-                       activate ? "" : "in");
-                GOTO(out, index);
-        }
+        if (ev == OBD_NOTIFY_DEACTIVATE || ev == OBD_NOTIFY_ACTIVATE) {
+                activate = (ev == OBD_NOTIFY_ACTIVATE) ? 1 : 0;
 
-        CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n", obd_uuid2str(uuid),
-               activate ? "" : "in");
+                if (lov->lov_tgts[index]->ltd_activate == activate) {
+                        CDEBUG(D_INFO, "OSC %s already %sactivate!\n",
+                               uuid->uuid, activate ? "" : "de");
+                } else {
+                        lov->lov_tgts[index]->ltd_activate = activate;
+                        CDEBUG(D_CONFIG, "%sactivate OSC %s\n",
+                               activate ? "" : "de", obd_uuid2str(uuid));
+                }
 
-        lov->lov_tgts[index]->ltd_active = activate;
+        } else if (ev == OBD_NOTIFY_INACTIVE || ev == OBD_NOTIFY_ACTIVE) {
+                active = (ev == OBD_NOTIFY_ACTIVE) ? 1 : 0;
 
-        if (activate) {
-                lov->desc.ld_active_tgt_count++;
-                lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
+                if (lov->lov_tgts[index]->ltd_active == active) {
+                        CDEBUG(D_INFO, "OSC %s already %sactive!\n",
+                               uuid->uuid, active ? "" : "in");
+                        GOTO(out, index);
+                } else {
+                        CDEBUG(D_CONFIG, "Marking OSC %s %sactive\n",
+                               obd_uuid2str(uuid), active ? "" : "in");
+                }
+
+                lov->lov_tgts[index]->ltd_active = active;
+                if (active) {
+                        lov->desc.ld_active_tgt_count++;
+                        lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 0;
+                } else {
+                        lov->desc.ld_active_tgt_count--;
+                        lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
+                }
+                /* remove any old qos penalty */
+                lov->lov_tgts[index]->ltd_qos.ltq_penalty = 0;
         } else {
-                lov->desc.ld_active_tgt_count--;
-                lov->lov_tgts[index]->ltd_exp->exp_obd->obd_inactive = 1;
+                CERROR("Unknown event(%d) for uuid %s", ev, uuid->uuid);
         }
-        /* remove any old qos penalty */
-        lov->lov_tgts[index]->ltd_qos.ltq_penalty = 0;
 
  out:
         obd_putref(obd);
@@ -442,7 +459,8 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
         int rc = 0;
         ENTRY;
 
-        if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE) {
+        if (ev == OBD_NOTIFY_ACTIVE || ev == OBD_NOTIFY_INACTIVE ||
+            ev == OBD_NOTIFY_ACTIVATE || ev == OBD_NOTIFY_DEACTIVATE) {
                 struct obd_uuid *uuid;
 
                 LASSERT(watched);
@@ -458,10 +476,9 @@ static int lov_notify(struct obd_device *obd, struct obd_device *watched,
                 /* Set OSC as active before notifying the observer, so the
                  * observer can use the OSC normally.
                  */
-                rc = lov_set_osc_active(obd, uuid, ev == OBD_NOTIFY_ACTIVE);
+                rc = lov_set_osc_active(obd, uuid, ev);
                 if (rc < 0) {
-                        CERROR("%sactivation of %s failed: %d\n",
-                               (ev == OBD_NOTIFY_ACTIVE) ? "" : "de",
+                        CERROR("event(%d) of %s failed: %d\n", ev,
                                obd_uuid2str(uuid), rc);
                         RETURN(rc);
                 }
@@ -1547,49 +1564,71 @@ static int lov_punch(struct obd_export *exp, struct obd_info *oinfo,
         RETURN(0);
 }
 
-static int lov_sync(struct obd_export *exp, struct obdo *oa,
-                    struct lov_stripe_md *lsm, obd_off start, obd_off end,
-                    void *capa)
+static int lov_sync_interpret(struct ptlrpc_request_set *rqset,
+                              void *data, int rc)
 {
-        struct lov_request_set *set;
-        struct obd_info oinfo;
+        struct lov_request_set *lovset = data;
+        int err;
+        ENTRY;
+
+        if (rc)
+                lovset->set_completes = 0;
+        err = lov_fini_sync_set(lovset);
+        RETURN(rc ?: err);
+}
+
+static int lov_sync(struct obd_export *exp, struct obd_info *oinfo,
+                    obd_off start, obd_off end,
+                    struct ptlrpc_request_set *rqset)
+{
+        struct lov_request_set *set = NULL;
         struct lov_obd *lov;
         cfs_list_t *pos;
         struct lov_request *req;
-        int err = 0, rc = 0;
+        int rc = 0;
         ENTRY;
 
-        ASSERT_LSM_MAGIC(lsm);
+        ASSERT_LSM_MAGIC(oinfo->oi_md);
+        LASSERT(rqset != NULL);
 
         if (!exp->exp_obd)
                 RETURN(-ENODEV);
 
         lov = &exp->exp_obd->u.lov;
-        rc = lov_prep_sync_set(exp, &oinfo, oa, lsm, start, end, &set);
+        rc = lov_prep_sync_set(exp, oinfo, start, end, &set);
         if (rc)
                 RETURN(rc);
 
+        CDEBUG(D_INFO, "fsync objid "LPX64" ["LPX64", "LPX64"]\n",
+               set->set_oi->oi_oa->o_id, start, end);
+
         cfs_list_for_each (pos, &set->set_list) {
                 req = cfs_list_entry(pos, struct lov_request, rq_link);
 
-                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp,
-                              req->rq_oi.oi_oa, NULL,
+                rc = obd_sync(lov->lov_tgts[req->rq_idx]->ltd_exp, &req->rq_oi,
                               req->rq_oi.oi_policy.l_extent.start,
-                              req->rq_oi.oi_policy.l_extent.end, capa);
-                err = lov_update_common_set(set, req, rc);
-                if (err) {
+                              req->rq_oi.oi_policy.l_extent.end, rqset);
+                if (rc) {
                         CERROR("error: fsync objid "LPX64" subobj "LPX64
                                " on OST idx %d: rc = %d\n",
                                set->set_oi->oi_oa->o_id,
                                req->rq_oi.oi_oa->o_id, req->rq_idx, rc);
-                        if (!rc)
-                                rc = err;
+                        break;
                 }
         }
-        err = lov_fini_sync_set(set);
-        if (!rc)
-                rc = err;
-        RETURN(rc);
+
+        /* If we are not waiting for responses on async requests, return. */
+        if (rc || cfs_list_empty(&rqset->set_requests)) {
+                int err = lov_fini_sync_set(set);
+
+                RETURN(rc ?: err);
+        }
+
+        LASSERT(rqset->set_interpret == NULL);
+        rqset->set_interpret = lov_sync_interpret;
+        rqset->set_arg = (void *)set;
+
+        RETURN(0);
 }
 
 static int lov_brw_check(struct lov_obd *lov, struct obd_info *lov_oinfo,
@@ -2332,7 +2371,7 @@ static int lov_fiemap(struct lov_obd *lov, __u32 keylen, void *key,
         if (fiemap_count_to_size(fm_key->fiemap.fm_extent_count) < buffer_size)
                 buffer_size = fiemap_count_to_size(fm_key->fiemap.fm_extent_count);
 
-        OBD_ALLOC(fm_local, buffer_size);
+        OBD_ALLOC_LARGE(fm_local, buffer_size);
         if (fm_local == NULL)
                 GOTO(out, rc = -ENOMEM);
         lcl_fm_ext = &fm_local->fm_extents[0];
@@ -2523,7 +2562,7 @@ skip_last_device_calc:
         fiemap->fm_mapped_extents = current_extent;
 
 out:
-        OBD_FREE(fm_local, buffer_size);
+        OBD_FREE_LARGE(fm_local, buffer_size);
         return rc;
 }