Whamcloud - gitweb
b=15957
[fs/lustre-release.git] / lustre / fid / fid_handler.c
index 9e3d7ad..5af9343 100644 (file)
@@ -93,6 +93,7 @@ int seq_server_set_cli(struct lu_server_seq *seq,
                seq->lss_name, cli->lcs_name);
 
         seq->lss_cli = cli;
+        cli->lcs_space.lsr_mdt = seq->lss_site->ms_node_id;
         EXIT;
 out_up:
         up(&seq->lss_sem);
@@ -100,16 +101,22 @@ out_up:
 }
 EXPORT_SYMBOL(seq_server_set_cli);
 
-/*
+/**
  * On controller node, allocate new super sequence for regular sequence server.
+ * As this super sequence controller, this node suppose to maintain fld
+ * and update index.
+ * \a out range always has currect mds node number of requester.
  */
+
 static int __seq_server_alloc_super(struct lu_server_seq *seq,
-                                    struct lu_range *in,
-                                    struct lu_range *out,
+                                    struct lu_seq_range *in,
+                                    struct lu_seq_range *out,
                                     const struct lu_env *env)
 {
-        struct lu_range *space = &seq->lss_space;
-        int rc;
+        struct lu_seq_range *space = &seq->lss_space;
+        struct thandle *th;
+        __u64 mdt = out->lsr_mdt;
+        int rc, credit;
         ENTRY;
 
         LASSERT(range_is_sane(space));
@@ -118,8 +125,8 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                 CDEBUG(D_INFO, "%s: Input seq range: "
                        DRANGE"\n", seq->lss_name, PRANGE(in));
 
-                if (in->lr_end > space->lr_start)
-                        space->lr_start = in->lr_end;
+                if (in->lsr_end > space->lsr_start)
+                        space->lsr_start = in->lsr_end;
                 *out = *in;
 
                 CDEBUG(D_INFO, "%s: Recovered space: "DRANGE"\n",
@@ -130,7 +137,7 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                               "Only "LPU64" sequences left\n", seq->lss_name,
                               range_space(space));
                         *out = *space;
-                        space->lr_start = space->lr_end;
+                        space->lsr_start = space->lsr_end;
                 } else if (range_is_exhausted(space)) {
                         CERROR("%s: Sequences space is exhausted\n",
                                seq->lss_name);
@@ -139,23 +146,40 @@ static int __seq_server_alloc_super(struct lu_server_seq *seq,
                         range_alloc(out, space, seq->lss_width);
                 }
         }
+        out->lsr_mdt = mdt;
+
+        credit = SEQ_TXN_STORE_CREDITS + FLD_TXN_INDEX_INSERT_CREDITS;
+
+        th = seq_store_trans_start(seq, env, credit);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
 
-        rc = seq_store_write(seq, env);
+        rc = seq_store_write(seq, env, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                        seq->lss_name, rc);
-                RETURN(rc);
+                goto out;
         }
 
-        CDEBUG(D_INFO, "%s: Allocated super-sequence "
-               DRANGE"\n", seq->lss_name, PRANGE(out));
+        rc = fld_server_create(seq->lss_site->ms_server_fld,
+                               env, out, th);
+        if (rc) {
+                CERROR("%s: Can't Update fld database, rc %d\n",
+                       seq->lss_name, rc);
+        }
+
+out:
+        seq_store_trans_stop(seq, env, th);
+
+        CDEBUG(D_INFO, "%s: super-sequence allocation rc = %d "
+               DRANGE"\n", seq->lss_name, rc, PRANGE(out));
 
         RETURN(rc);
 }
 
 int seq_server_alloc_super(struct lu_server_seq *seq,
-                           struct lu_range *in,
-                           struct lu_range *out,
+                           struct lu_seq_range *in,
+                           struct lu_seq_range *out,
                            const struct lu_env *env)
 {
         int rc;
@@ -169,12 +193,14 @@ int seq_server_alloc_super(struct lu_server_seq *seq,
 }
 
 static int __seq_server_alloc_meta(struct lu_server_seq *seq,
-                                   struct lu_range *in,
-                                   struct lu_range *out,
+                                   struct lu_seq_range *in,
+                                   struct lu_seq_range *out,
                                    const struct lu_env *env)
 {
-        struct lu_range *space = &seq->lss_space;
+        struct lu_seq_range *space = &seq->lss_space;
+        struct thandle *th;
         int rc = 0;
+
         ENTRY;
 
         LASSERT(range_is_sane(space));
@@ -193,22 +219,22 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * we check here that range from client is "newer" than
                          * exhausted super.
                          */
-                        LASSERT(in->lr_end > space->lr_start);
+                        LASSERT(in->lsr_end > space->lsr_start);
 
                         /*
                          * Start is set to end of last allocated, because it
                          * *is* already allocated so we take that into account
                          * and do not use for other allocations.
                          */
-                        space->lr_start = in->lr_end;
+                        space->lsr_start = in->lsr_end;
 
                         /*
-                         * End is set to in->lr_start + super sequence
-                         * allocation unit. That is because in->lr_start is
+                         * End is set to in->lsr_start + super sequence
+                         * allocation unit. That is because in->lsr_start is
                          * first seq in new allocated range from controller
                          * before failure.
                          */
-                        space->lr_end = in->lr_start + LUSTRE_SEQ_SUPER_WIDTH;
+                        space->lsr_end = in->lsr_start + LUSTRE_SEQ_SUPER_WIDTH;
 
                         if (!seq->lss_cli) {
                                 CERROR("%s: No sequence controller "
@@ -221,6 +247,7 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * obtained range from it was @space.
                          */
                         rc = seq_client_replay_super(seq->lss_cli, space, env);
+
                         if (rc) {
                                 CERROR("%s: Can't replay super-sequence, "
                                        "rc %d\n", seq->lss_name, rc);
@@ -231,8 +258,8 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                          * Update super start by end from client's range. Super
                          * end should not be changed if range was not exhausted.
                          */
-                        if (in->lr_end > space->lr_start)
-                                space->lr_start = in->lr_end;
+                        if (in->lsr_end > space->lsr_start)
+                                space->lsr_start = in->lsr_end;
                 }
 
                 *out = *in;
@@ -266,7 +293,11 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                 range_alloc(out, space, seq->lss_width);
         }
 
-        rc = seq_store_write(seq, env);
+        th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+        if (IS_ERR(th))
+                RETURN(PTR_ERR(th));
+
+        rc = seq_store_write(seq, env, th);
         if (rc) {
                 CERROR("%s: Can't write space data, rc %d\n",
                       seq->lss_name, rc);
@@ -277,12 +308,13 @@ static int __seq_server_alloc_meta(struct lu_server_seq *seq,
                        DRANGE"\n", seq->lss_name, PRANGE(out));
         }
 
+        seq_store_trans_stop(seq, env, th);
         RETURN(rc);
 }
 
 int seq_server_alloc_meta(struct lu_server_seq *seq,
-                          struct lu_range *in,
-                          struct lu_range *out,
+                          struct lu_seq_range *in,
+                          struct lu_seq_range *out,
                           const struct lu_env *env)
 {
         int rc;
@@ -298,8 +330,8 @@ EXPORT_SYMBOL(seq_server_alloc_meta);
 
 static int seq_server_handle(struct lu_site *site,
                              const struct lu_env *env,
-                             __u32 opc, struct lu_range *in,
-                             struct lu_range *out)
+                             __u32 opc, struct lu_seq_range *in,
+                             struct lu_seq_range *out)
 {
         int rc;
         struct md_site *mite;
@@ -337,7 +369,7 @@ static int seq_req_handle(struct ptlrpc_request *req,
                           const struct lu_env *env,
                           struct seq_thread_info *info)
 {
-        struct lu_range *out, *in = NULL;
+        struct lu_seq_range *out, *in = NULL, *tmp;
         struct lu_site *site;
         int rc = -EPROTO;
         __u32 *opc;
@@ -356,13 +388,16 @@ static int seq_req_handle(struct ptlrpc_request *req,
                 if (out == NULL)
                         RETURN(err_serious(-EPROTO));
 
-                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
-                        in = req_capsule_client_get(info->sti_pill,
-                                                    &RMF_SEQ_RANGE);
+                tmp = req_capsule_client_get(info->sti_pill, &RMF_SEQ_RANGE);
 
+                if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
+                        in = tmp;
                         LASSERT(!range_is_zero(in) && range_is_sane(in));
                 }
+                /* seq client passed mdt id, we need to pass that using out
+                 * range parameter */
 
+                out->lsr_mdt = tmp->lsr_mdt;
                 rc = seq_server_handle(site, env, *opc, in, out);
         } else
                 rc = err_serious(-EPROTO);
@@ -475,8 +510,10 @@ int seq_server_init(struct lu_server_seq *seq,
                     struct dt_device *dev,
                     const char *prefix,
                     enum lu_mgr_type type,
+                    struct md_site *ms,
                     const struct lu_env *env)
 {
+        struct thandle *th;
         int rc, is_srv = (type == LUSTRE_SEQ_SERVER);
         ENTRY;
 
@@ -485,6 +522,7 @@ int seq_server_init(struct lu_server_seq *seq,
 
         seq->lss_cli = NULL;
         seq->lss_type = type;
+        seq->lss_site = ms;
         range_init(&seq->lss_space);
         sema_init(&seq->lss_sem, 1);
 
@@ -497,7 +535,6 @@ int seq_server_init(struct lu_server_seq *seq,
         rc = seq_store_init(seq, env, dev);
         if (rc)
                 GOTO(out, rc);
-
         /* Request backing store for saved sequence info. */
         rc = seq_store_read(seq, env);
         if (rc == -ENODATA) {
@@ -507,16 +544,22 @@ int seq_server_init(struct lu_server_seq *seq,
                         LUSTRE_SEQ_ZERO_RANGE:
                         LUSTRE_SEQ_SPACE_RANGE;
 
+                seq->lss_space.lsr_mdt = ms->ms_node_id;
                 CDEBUG(D_INFO, "%s: No data found "
                        "on store. Initialize space\n",
                        seq->lss_name);
 
+                th = seq_store_trans_start(seq, env, SEQ_TXN_STORE_CREDITS);
+                if (IS_ERR(th))
+                        RETURN(PTR_ERR(th));
+
                 /* Save default controller value to store. */
-                rc = seq_store_write(seq, env);
+                rc = seq_store_write(seq, env, th);
                 if (rc) {
                         CERROR("%s: Can't write space data, "
                                "rc %d\n", seq->lss_name, rc);
                 }
+                seq_store_trans_stop(seq, env, th);
         } else if (rc) {
                CERROR("%s: Can't read space data, rc %d\n",
                       seq->lss_name, rc);