1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/fid/fid_request.c
5 * Lustre Sequence Manager
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Yury Umanets <umka@clusterfs.com>
10 * This file is part of the Lustre file system, http://www.lustre.org
11 * Lustre is a trademark of Cluster File Systems, Inc.
13 * You may have signed or agreed to another license before downloading
14 * this software. If so, you are bound by the terms and conditions
15 * of that agreement, and the following does not apply to you. See the
16 * LICENSE file included with this distribution for more information.
18 * If you did not agree to a different license, then this copy of Lustre
19 * is open source software; you can redistribute it and/or modify it
20 * under the terms of version 2 of the GNU General Public License as
21 * published by the Free Software Foundation.
23 * In either case, Lustre is distributed in the hope that it will be
24 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
26 * license text for more details.
30 # define EXPORT_SYMTAB
32 #define DEBUG_SUBSYSTEM S_FID
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 #include "fid_internal.h"
50 static int seq_client_rpc(struct lu_client_seq *seq,
51 struct lu_range *range,
52 __u32 opc, const char *opcname)
54 int rc, size[3] = { sizeof(struct ptlrpc_body),
56 sizeof(struct lu_range) };
57 struct obd_export *exp = seq->lcs_exp;
58 struct ptlrpc_request *req;
59 struct lu_range *out, *in;
60 struct req_capsule pill;
64 req = ptlrpc_prep_req(class_exp2cliimp(exp),
71 req_capsule_init(&pill, req, RCL_CLIENT, NULL);
72 req_capsule_set(&pill, &RQF_SEQ_QUERY);
74 /* init operation code */
75 op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
78 /* zero out input range, this is not recovery yet. */
79 in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
82 size[1] = sizeof(struct lu_range);
83 ptlrpc_req_set_repsize(req, 2, size);
85 if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
86 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
87 SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
89 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
90 SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
93 rc = ptlrpc_queue_wait(req);
97 out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
100 if (!range_is_sane(range)) {
101 CERROR("%s: Invalid range received from server: "
102 DRANGE"\n", seq->lcs_name, PRANGE(range));
103 GOTO(out_req, rc = -EINVAL);
106 if (range_is_exhausted(range)) {
107 CERROR("%s: Range received from server is exhausted: "
108 DRANGE"]\n", seq->lcs_name, PRANGE(range));
109 GOTO(out_req, rc = -EINVAL);
112 /* Save server out to request for recovery case. */
115 CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
116 seq->lcs_name, opcname, PRANGE(range));
120 req_capsule_fini(&pill);
121 ptlrpc_req_finished(req);
125 /* request sequence-controller node to allocate new super-sequence. */
126 static int __seq_client_alloc_super(struct lu_client_seq *seq,
127 const struct lu_env *env)
133 LASSERT(env != NULL);
134 rc = seq_server_alloc_super(seq->lcs_srv, NULL,
139 rc = seq_client_rpc(seq, &seq->lcs_range,
140 SEQ_ALLOC_SUPER, "super");
147 int seq_client_alloc_super(struct lu_client_seq *seq,
148 const struct lu_env *env)
154 rc = __seq_client_alloc_super(seq, env);
159 EXPORT_SYMBOL(seq_client_alloc_super);
161 /* request sequence-controller node to allocate new meta-sequence. */
162 static int __seq_client_alloc_meta(struct lu_client_seq *seq,
163 const struct lu_env *env)
169 LASSERT(env != NULL);
170 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
175 rc = seq_client_rpc(seq, &seq->lcs_range,
176 SEQ_ALLOC_META, "meta");
183 int seq_client_alloc_meta(struct lu_client_seq *seq,
184 const struct lu_env *env)
190 rc = __seq_client_alloc_meta(seq, env);
195 EXPORT_SYMBOL(seq_client_alloc_meta);
197 /* allocate new sequence for client (llite or MDC are expected to use this) */
198 static int __seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
203 LASSERT(range_is_sane(&seq->lcs_range));
205 /* if we still have free sequences in meta-sequence we allocate new seq
206 * from given range, if not - allocate new meta-sequence. */
207 if (range_space(&seq->lcs_range) == 0) {
208 rc = __seq_client_alloc_meta(seq, NULL);
210 CERROR("%s: Can't allocate new meta-sequence, "
211 "rc %d\n", seq->lcs_name, rc);
216 LASSERT(range_space(&seq->lcs_range) > 0);
217 *seqnr = seq->lcs_range.lr_start;
218 seq->lcs_range.lr_start++;
220 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n",
221 seq->lcs_name, *seqnr);
225 int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
231 rc = __seq_client_alloc_seq(seq, seqnr);
236 EXPORT_SYMBOL(seq_client_alloc_seq);
238 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
243 LASSERT(fid != NULL);
247 if (!fid_is_sane(&seq->lcs_fid) ||
248 fid_oid(&seq->lcs_fid) >= seq->lcs_width)
252 /* allocate new sequence for case client has no sequence at all
253 * or sequence is exhausted and should be switched. */
254 rc = __seq_client_alloc_seq(seq, &seqnr);
256 CERROR("%s: Can't allocate new sequence, "
257 "rc %d\n", seq->lcs_name, rc);
262 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
263 seq->lcs_fid.f_seq = seqnr;
264 seq->lcs_fid.f_ver = 0;
266 /* inform caller that sequence switch is performed to allow it
267 * to setup FLD for it. */
270 seq->lcs_fid.f_oid++;
275 LASSERT(fid_is_sane(fid));
277 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n",
278 seq->lcs_name, PFID(fid));
285 EXPORT_SYMBOL(seq_client_alloc_fid);
287 static void seq_client_proc_fini(struct lu_client_seq *seq);
290 static int seq_client_proc_init(struct lu_client_seq *seq)
295 seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
299 if (IS_ERR(seq->lcs_proc_dir)) {
300 CERROR("%s: LProcFS failed in seq-init\n",
302 rc = PTR_ERR(seq->lcs_proc_dir);
306 rc = lprocfs_add_vars(seq->lcs_proc_dir,
307 seq_client_proc_list, seq);
309 CERROR("%s: Can't init sequence manager "
310 "proc, rc %d\n", seq->lcs_name, rc);
311 GOTO(out_cleanup, rc);
317 seq_client_proc_fini(seq);
321 static void seq_client_proc_fini(struct lu_client_seq *seq)
324 if (seq->lcs_proc_dir) {
325 if (!IS_ERR(seq->lcs_proc_dir))
326 lprocfs_remove(seq->lcs_proc_dir);
327 seq->lcs_proc_dir = NULL;
332 static int seq_client_proc_init(struct lu_client_seq *seq)
337 static void seq_client_proc_fini(struct lu_client_seq *seq)
343 int seq_client_init(struct lu_client_seq *seq,
344 struct obd_export *exp,
345 enum lu_cli_type type,
347 struct lu_server_seq *srv)
352 LASSERT(seq != NULL);
353 LASSERT(prefix != NULL);
357 seq->lcs_type = type;
358 fid_zero(&seq->lcs_fid);
359 range_zero(&seq->lcs_range);
360 sema_init(&seq->lcs_sem, 1);
361 seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
364 LASSERT(seq->lcs_srv != NULL);
366 LASSERT(seq->lcs_exp != NULL);
367 seq->lcs_exp = class_export_get(seq->lcs_exp);
370 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
373 rc = seq_client_proc_init(seq);
375 seq_client_fini(seq);
378 EXPORT_SYMBOL(seq_client_init);
380 void seq_client_fini(struct lu_client_seq *seq)
384 seq_client_proc_fini(seq);
386 if (seq->lcs_exp != NULL) {
387 class_export_put(seq->lcs_exp);
394 EXPORT_SYMBOL(seq_client_fini);