* before failure.
*/
space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+
+ if (!seq->lss_cli) {
+ CERROR("%s: No sequence controller "
+ "is attached.\n", seq->lss_name);
+ RETURN(-ENODEV);
+ }
+
+ /*
+ * Let controller know that this is recovery and last
+ * obtained range from it was @space.
+ */
+ rc = seq_client_replay_super(seq->lss_cli, space, env);
+ if (rc) {
+ CERROR("%s: Can't replay super-sequence, "
+ "rc %d\n", seq->lss_name, rc);
+ RETURN(rc);
+ }
} else {
/*
* Update super start by end from client's range. Super
/* Request backing store for saved sequence info. */
rc = seq_store_read(seq, env);
if (rc == -ENODATA) {
- CDEBUG(D_INFO|D_WARNING, "%s: No data found "
- "on storage\n", seq->lss_name);
-
+
/* Nothing is read, init by default value. */
seq->lss_space = is_srv ?
LUSTRE_SEQ_ZERO_RANGE:
LUSTRE_SEQ_SPACE_RANGE;
- if (!is_srv) {
- /* Save default controller value to store. */
- rc = seq_store_write(seq, env);
- if (rc) {
- CERROR("%s: Can't write space data, "
- "rc %d\n", seq->lss_name, rc);
- }
+ CDEBUG(D_INFO|D_WARNING, "%s: No data found "
+ "on store. Initialize space\n",
+ seq->lss_name);
+
+ /* Save default controller value to store. */
+ rc = seq_store_write(seq, env);
+ if (rc) {
+ CERROR("%s: Can't write space data, "
+ "rc %d\n", seq->lss_name, rc);
}
} else if (rc) {
CERROR("%s: Can't read space data, rc %d\n",
#include "fid_internal.h"
static int seq_client_rpc(struct lu_client_seq *seq,
- struct lu_range *space,
+ struct lu_range *input,
+ struct lu_range *output,
__u32 opc, const char *opcname)
{
int rc, size[3] = { sizeof(struct ptlrpc_body),
req_capsule_init(&pill, req, RCL_CLIENT, NULL);
req_capsule_set(&pill, &RQF_SEQ_QUERY);
- /* init operation code */
+ /* Init operation code */
op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
*op = opc;
- /* zero out input range, this is not recovery yet. */
+ /* Zero out input range, this is not recovery yet. */
in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
- range_zero(in);
+ if (input != NULL)
+ *in = *input;
+ else
+ range_zero(in);
size[1] = sizeof(struct lu_range);
ptlrpc_req_set_repsize(req, 2, size);
GOTO(out_req, rc);
out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
- *space = *out;
+ *output = *out;
- if (!range_is_sane(space)) {
+ if (!range_is_sane(output)) {
CERROR("%s: Invalid range received from server: "
- DRANGE"\n", seq->lcs_name, PRANGE(space));
+ DRANGE"\n", seq->lcs_name, PRANGE(output));
GOTO(out_req, rc = -EINVAL);
}
- if (range_is_exhausted(space)) {
+ if (range_is_exhausted(output)) {
CERROR("%s: Range received from server is exhausted: "
- DRANGE"]\n", seq->lcs_name, PRANGE(space));
+ DRANGE"]\n", seq->lcs_name, PRANGE(output));
GOTO(out_req, rc = -EINVAL);
}
- /* Save server out to request for recovery case. */
+ /*
+ * Save server response to request for recovery case, it will be sent to
+ * server later if needed.
+ */
*in = *out;
CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
- seq->lcs_name, opcname, PRANGE(space));
+ seq->lcs_name, opcname, PRANGE(output));
EXIT;
out_req:
return rc;
}
-/* request sequence-controller node to allocate new super-sequence. */
+/* Request sequence-controller node to allocate new super-sequence. */
static int __seq_client_alloc_super(struct lu_client_seq *seq,
const struct lu_env *env)
{
if (seq->lcs_srv) {
LASSERT(env != NULL);
rc = seq_server_alloc_super(seq->lcs_srv, NULL,
- &seq->lcs_space,
- env);
+ &seq->lcs_space, env);
} else {
#endif
- rc = seq_client_rpc(seq, &seq->lcs_space,
+ rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
SEQ_ALLOC_SUPER, "super");
#ifdef __KERNEL__
}
}
EXPORT_SYMBOL(seq_client_alloc_super);
-/* request sequence-controller node to allocate new meta-sequence. */
+/* Request sequence-controller node to allocate new super-sequence. */
+static int __seq_client_replay_super(struct lu_client_seq *seq,
+ struct lu_range *range,
+ const struct lu_env *env)
+{
+ int rc = 0;
+
+#ifdef __KERNEL__
+ if (seq->lcs_srv) {
+ LASSERT(env != NULL);
+ rc = seq_server_alloc_super(seq->lcs_srv, range,
+ &seq->lcs_space, env);
+ } else {
+#endif
+#if 0
+ /*
+ * XXX: Seems we do not need to replay in case of remote
+ * controller. Lustre anyway supports onlu signle failure
+ * recovery.
+ */
+ rc = seq_client_rpc(seq, range, &seq->lcs_space,
+ SEQ_ALLOC_SUPER, "super");
+#endif
+#ifdef __KERNEL__
+ }
+#endif
+ return rc;
+}
+
+int seq_client_replay_super(struct lu_client_seq *seq,
+ struct lu_range *range,
+ const struct lu_env *env)
+{
+ int rc;
+ ENTRY;
+
+ down(&seq->lcs_sem);
+ rc = __seq_client_replay_super(seq, range, env);
+ up(&seq->lcs_sem);
+
+ RETURN(rc);
+}
+
+/* Request sequence-controller node to allocate new meta-sequence. */
static int __seq_client_alloc_meta(struct lu_client_seq *seq,
const struct lu_env *env)
{
env);
} else {
#endif
- rc = seq_client_rpc(seq, &seq->lcs_space,
+ rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
SEQ_ALLOC_META, "meta");
#ifdef __KERNEL__
}
GOTO(out, rc);
}
- /* init new fid */
seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
seq->lcs_fid.f_seq = seqnr;
seq->lcs_fid.f_ver = 0;
*/
rc = 1;
- CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence ["LPX64"]\n",
- seq->lcs_name, seqnr);
+ CDEBUG(D_INFO|D_WARNING, "%s: Switch to sequence "
+ "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
} else {
seq->lcs_fid.f_oid++;
rc = 0;
*fid = seq->lcs_fid;
LASSERT(fid_is_sane(fid));
- CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
+ CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,
+ PFID(fid));
EXIT;
out: