+ RETURN(rc);
+}
+
+/*
+ * last_rcvd update for echo client simulation.
+ * It updates last_rcvd client slot and version of object in
+ * simple way but with all locks to simulate all drawbacks
+ */
+static int tgt_last_rcvd_update_echo(const struct lu_env *env,
+ struct lu_target *tgt,
+ struct dt_object *obj,
+ struct thandle *th,
+ struct obd_export *exp)
+{
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct tg_export_data *ted = &exp->exp_target_data;
+ int rc = 0;
+
+ ENTRY;
+
+ tti->tti_transno = 0;
+
+ spin_lock(&tgt->lut_translock);
+ if (th->th_result == 0)
+ tti->tti_transno = ++tgt->lut_last_transno;
+ spin_unlock(&tgt->lut_translock);
+
+ /** VBR: set new versions */
+ if (th->th_result == 0 && obj != NULL)
+ dt_version_set(env, obj, tti->tti_transno, th);
+
+ /* if can't add callback, do sync write */
+ th->th_sync |= !!tgt_last_commit_cb_add(th, tgt, exp,
+ tti->tti_transno);
+
+ LASSERT(ted->ted_lr_off > 0);
+
+ mutex_lock(&ted->ted_lcd_lock);
+ LASSERT(ergo(tti->tti_transno == 0, th->th_result != 0));
+ ted->ted_lcd->lcd_last_transno = tti->tti_transno;
+ ted->ted_lcd->lcd_last_result = th->th_result;
+
+ tti->tti_off = ted->ted_lr_off;
+ rc = tgt_client_data_write(env, tgt, ted->ted_lcd, &tti->tti_off, th);
+ mutex_unlock(&ted->ted_lcd_lock);
+ RETURN(rc);
+}
+
+static int tgt_clients_data_init(const struct lu_env *env,
+ struct lu_target *tgt,
+ unsigned long last_size)
+{
+ struct obd_device *obd = tgt->lut_obd;
+ struct lr_server_data *lsd = &tgt->lut_lsd;
+ struct lsd_client_data *lcd = NULL;
+ struct tg_export_data *ted;
+ int cl_idx;
+ int rc = 0;
+ loff_t off = lsd->lsd_client_start;
+ __u32 generation = 0;
+ struct cfs_hash *hash = NULL;
+
+ ENTRY;
+
+ if (tgt->lut_bottom->dd_rdonly)
+ RETURN(0);
+
+ BUILD_BUG_ON(offsetof(struct lsd_client_data, lcd_padding) +
+ sizeof(lcd->lcd_padding) != LR_CLIENT_SIZE);
+
+ OBD_ALLOC_PTR(lcd);
+ if (lcd == NULL)
+ RETURN(-ENOMEM);
+
+ hash = cfs_hash_getref(tgt->lut_obd->obd_gen_hash);
+ if (hash == NULL)
+ GOTO(err_out, rc = -ENODEV);
+
+ for (cl_idx = 0; off < last_size; cl_idx++) {
+ struct obd_export *exp;
+ __u64 last_transno;
+
+ /* Don't assume off is incremented properly by
+ * read_record(), in case sizeof(*lcd)
+ * isn't the same as fsd->lsd_client_size. */
+ off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
+ rc = tgt_client_data_read(env, tgt, lcd, &off, cl_idx);
+ if (rc) {
+ CERROR("%s: error reading last_rcvd %s idx %d off "
+ "%llu: rc = %d\n", tgt_name(tgt), LAST_RCVD,
+ cl_idx, off, rc);
+ rc = 0;
+ break; /* read error shouldn't cause startup to fail */
+ }
+
+ if (lcd->lcd_uuid[0] == '\0') {
+ CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
+ cl_idx);
+ continue;
+ }
+
+ last_transno = lcd_last_transno(lcd);
+
+ /* These exports are cleaned up by disconnect, so they
+ * need to be set up like real exports as connect does.
+ */
+ CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: %llu"
+ " srv lr: %llu lx: %llu gen %u\n", lcd->lcd_uuid,
+ cl_idx, last_transno, lsd->lsd_last_transno,
+ lcd_last_xid(lcd), lcd->lcd_generation);
+
+ exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
+ if (IS_ERR(exp)) {
+ if (PTR_ERR(exp) == -EALREADY) {
+ /* export already exists, zero out this one */
+ CERROR("%s: Duplicate export %s!\n",
+ tgt_name(tgt), lcd->lcd_uuid);
+ continue;
+ }
+ GOTO(err_out, rc = PTR_ERR(exp));
+ }
+
+ ted = &exp->exp_target_data;
+ *ted->ted_lcd = *lcd;
+
+ rc = tgt_client_add(env, exp, cl_idx);
+ LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
+ /* VBR: set export last committed version */
+ exp->exp_last_committed = last_transno;
+ spin_lock(&exp->exp_lock);
+ exp->exp_connecting = 0;
+ exp->exp_in_recovery = 0;
+ spin_unlock(&exp->exp_lock);
+ atomic_inc(&obd->obd_max_recoverable_clients);
+
+ if (tgt->lut_lsd.lsd_feature_incompat &
+ OBD_INCOMPAT_MULTI_RPCS &&
+ lcd->lcd_generation != 0) {
+ /* compute the highest valid client generation */
+ generation = max(generation, lcd->lcd_generation);
+ /* fill client_generation <-> export hash table */
+ rc = cfs_hash_add_unique(hash, &lcd->lcd_generation,
+ &exp->exp_gen_hash);
+ if (rc != 0) {
+ CERROR("%s: duplicate export for client "
+ "generation %u\n",
+ tgt_name(tgt), lcd->lcd_generation);
+ class_export_put(exp);
+ GOTO(err_out, rc);
+ }
+ }
+
+ class_export_put(exp);
+
+ rc = rev_import_init(exp);
+ if (rc != 0) {
+ class_unlink_export(exp);
+ GOTO(err_out, rc);
+ }
+
+ /* Need to check last_rcvd even for duplicated exports. */
+ CDEBUG(D_OTHER, "client at idx %d has last_transno = %llu\n",
+ cl_idx, last_transno);
+
+ spin_lock(&tgt->lut_translock);
+ tgt->lut_last_transno = max(last_transno,
+ tgt->lut_last_transno);
+ spin_unlock(&tgt->lut_translock);
+ }
+
+ /* record highest valid client generation */
+ atomic_set(&tgt->lut_client_generation, generation);
+
+err_out:
+ if (hash != NULL)
+ cfs_hash_putref(hash);
+ OBD_FREE_PTR(lcd);
+ RETURN(rc);
+}
+
+struct server_compat_data {
+ __u32 rocompat;
+ __u32 incompat;
+ __u32 rocinit;
+ __u32 incinit;
+};
+
+static struct server_compat_data tgt_scd[] = {
+ [LDD_F_SV_TYPE_MDT] = {
+ .rocompat = OBD_ROCOMPAT_LOVOBJID,
+ .incompat = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+ OBD_INCOMPAT_FID | OBD_INCOMPAT_IAM_DIR |
+ OBD_INCOMPAT_LMM_VER | OBD_INCOMPAT_MULTI_OI |
+ OBD_INCOMPAT_MULTI_RPCS,
+ .rocinit = OBD_ROCOMPAT_LOVOBJID,
+ .incinit = OBD_INCOMPAT_MDT | OBD_INCOMPAT_COMMON_LR |
+ OBD_INCOMPAT_MULTI_OI,
+ },
+ [LDD_F_SV_TYPE_OST] = {
+ .rocompat = OBD_ROCOMPAT_IDX_IN_IDIF,
+ .incompat = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR |
+ OBD_INCOMPAT_FID,
+ .rocinit = OBD_ROCOMPAT_IDX_IN_IDIF,
+ .incinit = OBD_INCOMPAT_OST | OBD_INCOMPAT_COMMON_LR,
+ }
+};
+
+int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
+{
+ struct tgt_thread_info *tti = tgt_th_info(env);
+ struct lr_server_data *lsd = &tgt->lut_lsd;
+ unsigned long last_rcvd_size;
+ __u32 index;
+ int rc, type;
+
+ rc = dt_attr_get(env, tgt->lut_last_rcvd, &tti->tti_attr);
+ if (rc)
+ RETURN(rc);
+
+ last_rcvd_size = (unsigned long)tti->tti_attr.la_size;
+
+ /* ensure padding in the struct is the correct size */
+ BUILD_BUG_ON(offsetof(struct lr_server_data, lsd_padding) +
+ sizeof(lsd->lsd_padding) != LR_SERVER_SIZE);
+
+ rc = server_name2index(tgt_name(tgt), &index, NULL);
+ if (rc < 0) {
+ CERROR("%s: Can not get index from name: rc = %d\n",
+ tgt_name(tgt), rc);
+ RETURN(rc);
+ }
+ /* server_name2index() returns type */
+ type = rc;
+ if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) {
+ CERROR("%s: unknown target type %x\n", tgt_name(tgt), type);
+ RETURN(-EINVAL);
+ }
+
+ /* last_rcvd on OST doesn't provide reconstruct support because there
+ * may be up to 8 in-flight write requests per single slot in
+ * last_rcvd client data
+ */
+ tgt->lut_no_reconstruct = (type == LDD_F_SV_TYPE_OST);
+
+ if (last_rcvd_size == 0) {
+ LCONSOLE_WARN("%s: new disk, initializing\n", tgt_name(tgt));
+
+ memcpy(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid,
+ sizeof(lsd->lsd_uuid));
+ lsd->lsd_last_transno = 0;
+ lsd->lsd_mount_count = 0;
+ lsd->lsd_server_size = LR_SERVER_SIZE;
+ lsd->lsd_client_start = LR_CLIENT_START;
+ lsd->lsd_client_size = LR_CLIENT_SIZE;
+ lsd->lsd_subdir_count = OBJ_SUBDIR_COUNT;
+ lsd->lsd_osd_index = index;
+ lsd->lsd_feature_rocompat = tgt_scd[type].rocinit;
+ lsd->lsd_feature_incompat = tgt_scd[type].incinit;
+ } else {
+ rc = tgt_server_data_read(env, tgt);
+ if (rc) {
+ CERROR("%s: error reading LAST_RCVD: rc= %d\n",
+ tgt_name(tgt), rc);
+ RETURN(rc);
+ }
+ if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
+ if (tgt->lut_bottom->dd_rdonly) {
+ /* Such difference may be caused by mounting
+ * up snapshot with new fsname under rd_only
+ * mode. But even if it was NOT, it will not
+ * damage the system because of "rd_only". */
+ memcpy(lsd->lsd_uuid,
+ tgt->lut_obd->obd_uuid.uuid,
+ sizeof(lsd->lsd_uuid));
+ } else {
+ LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+ "OBD %s using the wrong "
+ "disk %s. Were the /dev/ "
+ "assignments rearranged?\n",
+ tgt->lut_obd->obd_uuid.uuid,
+ lsd->lsd_uuid);
+ RETURN(-EINVAL);
+ }
+ }
+
+ if (lsd->lsd_osd_index != index) {
+ LCONSOLE_ERROR_MSG(0x157, "%s: index %d in last rcvd "
+ "is different with the index %d in"
+ "config log, It might be disk"
+ "corruption!\n", tgt_name(tgt),
+ lsd->lsd_osd_index, index);
+ RETURN(-EINVAL);
+ }
+ }
+
+ if (lsd->lsd_feature_incompat & ~tgt_scd[type].incompat) {
+ CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
+ tgt_name(tgt),
+ lsd->lsd_feature_incompat & ~tgt_scd[type].incompat);
+ RETURN(-EINVAL);
+ }
+
+ if (type == LDD_F_SV_TYPE_MDT)
+ lsd->lsd_feature_incompat |= OBD_INCOMPAT_FID;
+
+ if (lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat) {
+ CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
+ tgt_name(tgt),
+ lsd->lsd_feature_rocompat & ~tgt_scd[type].rocompat);
+ RETURN(-EINVAL);
+ }
+ /** Interop: evict all clients at first boot with 1.8 last_rcvd */
+ if (type == LDD_F_SV_TYPE_MDT &&
+ !(lsd->lsd_feature_compat & OBD_COMPAT_20)) {
+ if (last_rcvd_size > lsd->lsd_client_start) {
+ LCONSOLE_WARN("%s: mounting at first time on 1.8 FS, "
+ "remove all clients for interop needs\n",
+ tgt_name(tgt));
+ rc = tgt_truncate_last_rcvd(env, tgt,
+ lsd->lsd_client_start);
+ if (rc)
+ RETURN(rc);
+ last_rcvd_size = lsd->lsd_client_start;
+ }
+ /** set 2.0 flag to upgrade/downgrade between 1.8 and 2.0 */
+ lsd->lsd_feature_compat |= OBD_COMPAT_20;
+ }
+
+ spin_lock(&tgt->lut_translock);
+ tgt->lut_last_transno = lsd->lsd_last_transno;
+ spin_unlock(&tgt->lut_translock);
+
+ lsd->lsd_mount_count++;
+
+ CDEBUG(D_INODE, "=======,=BEGIN DUMPING LAST_RCVD========\n");
+ CDEBUG(D_INODE, "%s: server last_transno: %llu\n",
+ tgt_name(tgt), tgt->lut_last_transno);
+ CDEBUG(D_INODE, "%s: server mount_count: %llu\n",
+ tgt_name(tgt), lsd->lsd_mount_count);
+ CDEBUG(D_INODE, "%s: server data size: %u\n",
+ tgt_name(tgt), lsd->lsd_server_size);
+ CDEBUG(D_INODE, "%s: per-client data start: %u\n",
+ tgt_name(tgt), lsd->lsd_client_start);
+ CDEBUG(D_INODE, "%s: per-client data size: %u\n",
+ tgt_name(tgt), lsd->lsd_client_size);
+ CDEBUG(D_INODE, "%s: last_rcvd size: %lu\n",
+ tgt_name(tgt), last_rcvd_size);
+ CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
+ tgt_name(tgt), lsd->lsd_subdir_count);
+ CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", tgt_name(tgt),
+ last_rcvd_size <= lsd->lsd_client_start ? 0 :
+ (last_rcvd_size - lsd->lsd_client_start) /
+ lsd->lsd_client_size);
+ CDEBUG(D_INODE, "========END DUMPING LAST_RCVD========\n");
+
+ if (lsd->lsd_server_size == 0 || lsd->lsd_client_start == 0 ||
+ lsd->lsd_client_size == 0) {
+ CERROR("%s: bad last_rcvd contents!\n", tgt_name(tgt));
+ RETURN(-EINVAL);
+ }
+
+ if (!tgt->lut_obd->obd_replayable)
+ CWARN("%s: recovery support OFF\n", tgt_name(tgt));
+
+ rc = tgt_clients_data_init(env, tgt, last_rcvd_size);
+ if (rc < 0)
+ GOTO(err_client, rc);
+
+ spin_lock(&tgt->lut_translock);
+ /* obd_last_committed is used for compatibility
+ * with other lustre recovery code */
+ tgt->lut_obd->obd_last_committed = tgt->lut_last_transno;
+ spin_unlock(&tgt->lut_translock);
+
+ tgt->lut_obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
+ tgt->lut_obd->u.obt.obt_instance = (__u32)lsd->lsd_mount_count;
+
+ /* save it, so mount count and last_transno is current */
+ rc = tgt_server_data_update(env, tgt, 0);
+ if (rc < 0)
+ GOTO(err_client, rc);
+
+ RETURN(0);
+
+err_client:
+ class_disconnect_exports(tgt->lut_obd);