Whamcloud - gitweb
- fixes in convertion functions after Nikita's insp.
[fs/lustre-release.git] / lustre / fid / fid_request.c
index 050a1f2..7b9a24a 100644 (file)
 #include <lustre_fid.h>
 #include "fid_internal.h"
 
-/* XXX: this should use new req-layout interface */
-static int 
-seq_client_rpc(struct lu_client_seq *seq, 
-               struct lu_range *range,
-               __u32 opc)
+static int seq_client_rpc(struct lu_client_seq *seq,
+                          struct lu_range *range,
+                          __u32 opc, const char *opcname)
 {
-        struct obd_export *exp = seq->seq_exp;
-        int repsize = sizeof(struct lu_range);
-        int rc, reqsize = sizeof(__u32);
+        struct obd_export *exp = seq->lcs_exp;
+        int repsize[2] = { sizeof(struct ptlrpc_body),
+                           sizeof(struct lu_range) };
+        int rc, reqsize[2] = { sizeof(struct ptlrpc_body),
+                               sizeof(__u32) };
         struct ptlrpc_request *req;
+        struct req_capsule pill;
         struct lu_range *ran;
         __u32 *op;
         ENTRY;
 
-        req = ptlrpc_prep_req(class_exp2cliimp(exp), 
+        req = ptlrpc_prep_req(class_exp2cliimp(exp),
                              LUSTRE_MDS_VERSION,
-                              SEQ_QUERY, 1, &reqsize,
+                              SEQ_QUERY, 2, reqsize,
                               NULL);
         if (req == NULL)
                 RETURN(-ENOMEM);
 
-        op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*op));
+        req_capsule_init(&pill, req, RCL_CLIENT, NULL);
+
+        req_capsule_set(&pill, &RQF_SEQ_QUERY);
+
+        op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
         *op = opc;
 
-        req->rq_replen = lustre_msg_size(1, &repsize);
-        
-        req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
-                SEQ_CTLR_PORTAL : SEQ_SRV_PORTAL;
+        ptlrpc_req_set_repsize(req, 2, repsize);
+
+        if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
+                req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
+                        SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
+        } else {
+                req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
+                        SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
+        }
 
         rc = ptlrpc_queue_wait(req);
         if (rc)
                 GOTO(out_req, rc);
 
-        ran = lustre_swab_repbuf(req, 0, sizeof(*ran),
-                                 lustre_swab_lu_range);
-
-        if (ran == NULL) {
-                CERROR("invalid range is returned\n");
+        ran = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
+        if (ran == NULL)
                 GOTO(out_req, rc = -EPROTO);
-        }
         *range = *ran;
-        
-        LASSERT(range_is_sane(range));
-        LASSERT(!range_is_exhausted(range));
-        
-        EXIT;
-out_req:
-        ptlrpc_req_finished(req); 
-        return rc;
-}
 
-static int
-__seq_client_alloc_opc(struct lu_client_seq *seq,
-                       int opc, const char *opcname)
-{
-        int rc;
-        ENTRY;
+        if (!range_is_sane(range)) {
+                CERROR("invalid seq range obtained from server: "
+                       DRANGE"\n", PRANGE(range));
+                GOTO(out_req, rc = -EINVAL);
+        }
+
+        if (range_is_exhausted(range)) {
+                CERROR("seq range obtained from server is exhausted: "
+                       DRANGE"]\n", PRANGE(range));
+                GOTO(out_req, rc = -EINVAL);
+        }
 
-        rc = seq_client_rpc(seq, &seq->seq_range, opc);
         if (rc == 0) {
-                CDEBUG(D_INFO, "SEQ-MGR(cli): allocated "
-                       "%s-sequence ["LPX64"-"LPX64"]\n",
-                       opcname, seq->seq_range.lr_start,
-                       seq->seq_range.lr_end);
+                CDEBUG(D_INFO, "%s: allocated %s-sequence "
+                       DRANGE"]\n", seq->lcs_name, opcname,
+                       PRANGE(range));
         }
-        RETURN(rc);
+        
+        EXIT;
+out_req:
+        req_capsule_fini(&pill);
+        ptlrpc_req_finished(req);
+        return rc;
 }
 
 /* request sequence-controller node to allocate new super-sequence. */
-static int
-__seq_client_alloc_super(struct lu_client_seq *seq)
+static int __seq_client_alloc_super(struct lu_client_seq *seq)
 {
-        ENTRY;
-        RETURN(__seq_client_alloc_opc(seq, SEQ_ALLOC_SUPER, "super"));
+        return seq_client_rpc(seq, &seq->lcs_range,
+                              SEQ_ALLOC_SUPER, "super");
 }
 
-int
-seq_client_alloc_super(struct lu_client_seq *seq)
+int seq_client_alloc_super(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
-        
-        down(&seq->seq_sem);
+
+        down(&seq->lcs_sem);
         rc = __seq_client_alloc_super(seq);
-        up(&seq->seq_sem);
+        up(&seq->lcs_sem);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_super);
 
 /* request sequence-controller node to allocate new meta-sequence. */
-static int
-__seq_client_alloc_meta(struct lu_client_seq *seq)
+static int __seq_client_alloc_meta(struct lu_client_seq *seq)
 {
-        ENTRY;
-        RETURN(__seq_client_alloc_opc(seq, SEQ_ALLOC_META, "meta"));
+        return seq_client_rpc(seq, &seq->lcs_range,
+                              SEQ_ALLOC_META, "meta");
 }
 
-int
-seq_client_alloc_meta(struct lu_client_seq *seq)
+int seq_client_alloc_meta(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
 
-        down(&seq->seq_sem);
+        down(&seq->lcs_sem);
         rc = __seq_client_alloc_meta(seq);
-        up(&seq->seq_sem);
+        up(&seq->lcs_sem);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_meta);
 
 /* allocate new sequence for client (llite or MDC are expected to use this) */
-static int
-__seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
 {
         int rc = 0;
         ENTRY;
 
-        LASSERT(range_is_sane(&seq->seq_range));
+        LASSERT(range_is_sane(&seq->lcs_range));
 
         /* if we still have free sequences in meta-sequence we allocate new seq
          * from given range, if not - allocate new meta-sequence. */
-        if (range_space(&seq->seq_range) == 0) {
+        if (range_space(&seq->lcs_range) == 0) {
                 rc = __seq_client_alloc_meta(seq);
                 if (rc) {
                         CERROR("can't allocate new meta-sequence, "
@@ -178,45 +178,45 @@ __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
                         RETURN(rc);
                 }
         }
-        
-        *seqnr = seq->seq_range.lr_start;
-        seq->seq_range.lr_start++;
-        
-        CDEBUG(D_INFO, "SEQ-MGR(cli): allocated "
-               "sequence ["LPX64"]\n", *seqnr);
+
+        LASSERT(range_space(&seq->lcs_range) > 0);
+        *seqnr = seq->lcs_range.lr_start;
+        seq->lcs_range.lr_start++;
+
+        CDEBUG(D_INFO, "%s: allocated sequence ["LPX64"]\n",
+               seq->lcs_name, *seqnr);
         RETURN(rc);
 }
 
-int
-seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
+int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
 {
         int rc = 0;
         ENTRY;
 
-        down(&seq->seq_sem);
+        down(&seq->lcs_sem);
         rc = __seq_client_alloc_seq(seq, seqnr);
-        up(&seq->seq_sem);
+        up(&seq->lcs_sem);
 
         RETURN(rc);
 }
 EXPORT_SYMBOL(seq_client_alloc_seq);
 
-int
-seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
+int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
 {
-        seqno_t seqnr = 0;
         int rc;
         ENTRY;
 
         LASSERT(fid != NULL);
 
-        down(&seq->seq_sem);
+        down(&seq->lcs_sem);
 
-        if (!fid_is_sane(&seq->seq_fid) ||
-            fid_oid(&seq->seq_fid) >= seq->seq_width)
+        if (!fid_is_sane(&seq->lcs_fid) ||
+            fid_oid(&seq->lcs_fid) >= seq->lcs_width)
         {
-                /* allocate new sequence for case client hass no sequence at all
-                 * or sequnece is exhausted and should be switched. */
+                seqno_t seqnr;
+                
+                /* allocate new sequence for case client has no sequence at all
+                 * or sequence is exhausted and should be switched. */
                 rc = __seq_client_alloc_seq(seq, &seqnr);
                 if (rc) {
                         CERROR("can't allocate new sequence, "
@@ -225,100 +225,107 @@ seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
                 }
 
                 /* init new fid */
-                seq->seq_fid.f_oid = LUSTRE_FID_INIT_OID;
-                seq->seq_fid.f_seq = seqnr;
-                seq->seq_fid.f_ver = 0;
+                seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
+                seq->lcs_fid.f_seq = seqnr;
+                seq->lcs_fid.f_ver = 0;
 
-                /* inform caller that sequnece switch is performed to allow it
+                /* inform caller that sequence switch is performed to allow it
                  * to setup FLD for it. */
-                rc = -ERESTART;
+                rc = 1;
         } else {
-                seq->seq_fid.f_oid++;
+                seq->lcs_fid.f_oid++;
                 rc = 0;
         }
 
-        *fid = seq->seq_fid;
+        *fid = seq->lcs_fid;
         LASSERT(fid_is_sane(fid));
-        
-        CDEBUG(D_INFO, "SEQ-MGR(cli): allocated FID "DFID3"\n",
-               PFID3(fid));
+
+        CDEBUG(D_INFO, "%s: allocated FID "DFID"\n",
+               seq->lcs_name, PFID(fid));
 
         EXIT;
 out:
-        up(&seq->seq_sem);
+        up(&seq->lcs_sem);
         return rc;
 }
 EXPORT_SYMBOL(seq_client_alloc_fid);
 
+static void seq_client_proc_fini(struct lu_client_seq *seq);
+
 #ifdef LPROCFS
-static int
-seq_client_proc_init(struct lu_client_seq *seq)
+static int seq_client_proc_init(struct lu_client_seq *seq)
 {
         int rc;
         ENTRY;
 
-        seq->seq_proc_dir = lprocfs_register(seq->seq_name,
+        seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
                                              proc_lustre_root,
                                              NULL, NULL);
-        
-        if (IS_ERR(seq->seq_proc_dir)) {
+
+        if (IS_ERR(seq->lcs_proc_dir)) {
                 CERROR("LProcFS failed in seq-init\n");
-                rc = PTR_ERR(seq->seq_proc_dir);
-                GOTO(err, rc);
+                rc = PTR_ERR(seq->lcs_proc_dir);
+                RETURN(rc);
         }
 
-        rc = lprocfs_add_vars(seq->seq_proc_dir,
+        rc = lprocfs_add_vars(seq->lcs_proc_dir,
                               seq_client_proc_list, seq);
         if (rc) {
                 CERROR("can't init sequence manager "
                        "proc, rc %d\n", rc);
-                GOTO(err_dir, rc);
+                GOTO(out_cleanup, rc);
         }
 
         RETURN(0);
 
-err_dir:
-        lprocfs_remove(seq->seq_proc_dir);
-err:
-        seq->seq_proc_dir = NULL;
+out_cleanup:
+        seq_client_proc_fini(seq);
         return rc;
 }
 
-static void
-seq_client_proc_fini(struct lu_client_seq *seq)
+static void seq_client_proc_fini(struct lu_client_seq *seq)
 {
         ENTRY;
-        if (seq->seq_proc_dir) {
-                lprocfs_remove(seq->seq_proc_dir);
-                seq->seq_proc_dir = NULL;
+        if (seq->lcs_proc_dir) {
+                if (!IS_ERR(seq->lcs_proc_dir))
+                        lprocfs_remove(seq->lcs_proc_dir);
+                seq->lcs_proc_dir = NULL;
         }
         EXIT;
 }
+#else
+static int seq_client_proc_init(struct lu_client_seq *seq)
+{
+        return 0;
+}
+
+static void seq_client_proc_fini(struct lu_client_seq *seq)
+{
+        return;
+}
 #endif
 
-int 
-seq_client_init(struct lu_client_seq *seq,
-                const char *uuid,
-                struct obd_export *exp)
+int seq_client_init(struct lu_client_seq *seq,
+                    const char *uuid,
+                    struct obd_export *exp,
+                    enum lu_cli_type type)
 {
         int rc = 0;
         ENTRY;
 
         LASSERT(exp != NULL);
-        
-        fid_zero(&seq->seq_fid);
-        range_zero(&seq->seq_range);
-        sema_init(&seq->seq_sem, 1);
-        seq->seq_exp = class_export_get(exp);
-        seq->seq_width = LUSTRE_SEQ_MAX_WIDTH;
 
-        snprintf(seq->seq_name, sizeof(seq->seq_name),
-                 "%s-%s", LUSTRE_SEQ_NAME, uuid);
+        seq->lcs_type = type;
+        fid_zero(&seq->lcs_fid);
+        range_zero(&seq->lcs_range);
+        sema_init(&seq->lcs_sem, 1);
+        seq->lcs_exp = class_export_get(exp);
+        seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
 
-#ifdef LPROCFS
-        rc = seq_client_proc_init(seq);
-#endif
+        snprintf(seq->lcs_name, sizeof(seq->lcs_name),
+                 "%s-cli-%s", LUSTRE_SEQ_NAME, uuid);
 
+        rc = seq_client_proc_init(seq);
         if (rc)
                 seq_client_fini(seq);
         else
@@ -332,17 +339,15 @@ void seq_client_fini(struct lu_client_seq *seq)
 {
         ENTRY;
 
-#ifdef LPROCFS
         seq_client_proc_fini(seq);
-#endif
-        
-        if (seq->seq_exp != NULL) {
-                class_export_put(seq->seq_exp);
-                seq->seq_exp = NULL;
+
+        if (seq->lcs_exp != NULL) {
+                class_export_put(seq->lcs_exp);
+                seq->lcs_exp = NULL;
         }
-        
+
         CDEBUG(D_INFO|D_WARNING, "Client Sequence Manager\n");
-        
+
         EXIT;
 }
 EXPORT_SYMBOL(seq_client_fini);