1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/mdc/mdc_fid.c
40 * Author: Yury Umanets <umka@clusterfs.com>
44 # define EXPORT_SYMTAB
46 #define DEBUG_SUBSYSTEM S_FID
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 #else /* __KERNEL__ */
52 # include <liblustre.h>
56 #include <obd_class.h>
57 #include <obd_support.h>
58 #include "mdc_internal.h"
60 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
61 struct lu_seq_range *output, __u32 opc,
65 __u32 size[3] = { sizeof(struct ptlrpc_body),
67 sizeof(struct lu_seq_range) };
68 struct obd_export *exp = seq->lcs_exp;
69 struct ptlrpc_request *req;
70 struct lu_seq_range *out, *in;
74 req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
75 SEQ_QUERY, 3, size, NULL);
79 req->rq_export = class_export_get(exp);
80 op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32));
83 /* Zero out input range, this is not recovery yet. */
84 in = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
85 sizeof(struct lu_seq_range));
91 size[1] = sizeof(struct lu_seq_range);
92 ptlrpc_req_set_repsize(req, 2, size);
94 LASSERT(seq->lcs_type == LUSTRE_SEQ_METADATA);
95 req->rq_request_portal = SEQ_METADATA_PORTAL;
97 mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
98 rc = ptlrpc_queue_wait(req);
99 mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
104 out = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
105 sizeof(struct lu_seq_range));
108 if (!range_is_sane(output)) {
109 CERROR("%s: Invalid range received from server: "
110 DRANGE"\n", seq->lcs_name, PRANGE(output));
111 GOTO(out_req, rc = -EINVAL);
114 if (range_is_exhausted(output)) {
115 CERROR("%s: Range received from server is exhausted: "
116 DRANGE"]\n", seq->lcs_name, PRANGE(output));
117 GOTO(out_req, rc = -EINVAL);
121 CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
122 seq->lcs_name, opcname, PRANGE(output));
126 ptlrpc_req_finished(req);
131 /* Request sequence-controller node to allocate new meta-sequence. */
132 static int seq_client_alloc_meta(struct lu_client_seq *seq)
137 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
138 SEQ_ALLOC_META, "meta");
142 /* Allocate new sequence for client. */
143 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
148 LASSERT(range_is_sane(&seq->lcs_space));
150 if (range_is_exhausted(&seq->lcs_space)) {
151 rc = seq_client_alloc_meta(seq);
153 CERROR("%s: Can't allocate new meta-sequence, "
154 "rc %d\n", seq->lcs_name, rc);
157 CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
158 seq->lcs_name, PRANGE(&seq->lcs_space));
164 LASSERT(!range_is_exhausted(&seq->lcs_space));
165 *seqnr = seq->lcs_space.lsr_start;
166 seq->lcs_space.lsr_start += 1;
168 CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
174 /* Allocate new fid on passed client @seq and save it to @fid. */
175 static int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
180 LASSERT(seq != NULL);
181 LASSERT(fid != NULL);
185 if (fid_is_zero(&seq->lcs_fid) ||
186 fid_oid(&seq->lcs_fid) >= seq->lcs_width)
190 rc = seq_client_alloc_seq(seq, &seqnr);
192 CERROR("%s: Can't allocate new sequence, "
193 "rc %d\n", seq->lcs_name, rc);
198 CDEBUG(D_INFO, "%s: Switch to sequence "
199 "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
201 seq->lcs_fid.f_seq = seqnr;
202 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
203 seq->lcs_fid.f_ver = 0;
206 * Inform caller that sequence switch is performed to allow it
207 * to setup FLD for it.
211 /* Just bump last allocated fid and return to caller. */
212 seq->lcs_fid.f_oid += 1;
219 CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name, PFID(fid));
224 * Finish the current sequence due to disconnect.
225 * See mdc_import_event()
227 static void seq_client_flush(struct lu_client_seq *seq)
229 LASSERT(seq != NULL);
231 fid_init(&seq->lcs_fid);
232 range_init(&seq->lcs_space);
236 static int seq_client_proc_init(struct lu_client_seq *seq)
241 static void seq_client_proc_fini(struct lu_client_seq *seq)
246 int seq_client_init(struct lu_client_seq *seq,
247 struct obd_export *exp,
248 enum lu_cli_type type,
255 LASSERT(seq != NULL);
256 LASSERT(prefix != NULL);
259 seq->lcs_type = type;
260 sema_init(&seq->lcs_sem, 1);
261 seq->lcs_width = width;
263 /* Make sure that things are clear before work is started. */
264 seq_client_flush(seq);
266 LASSERT(seq->lcs_exp != NULL);
267 seq->lcs_exp = class_export_get(seq->lcs_exp);
269 snprintf(seq->lcs_name, sizeof(seq->lcs_name),
272 rc = seq_client_proc_init(seq);
274 seq_client_fini(seq);
278 void seq_client_fini(struct lu_client_seq *seq)
282 seq_client_proc_fini(seq);
283 LASSERT(seq->lcs_exp != NULL);
285 if (seq->lcs_exp != NULL) {
286 class_export_put(seq->lcs_exp);
293 /* Allocate new fid on passed client @seq and save it to @fid. */
294 int mdc_fid_alloc(struct lu_client_seq *seq, struct lu_fid *fid)
299 rc = seq_client_alloc_fid(seq, fid);
305 void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
307 /* check that all fields are converted */
308 CLASSERT(sizeof *src ==
309 sizeof fid_seq(src) +
310 sizeof fid_oid(src) + sizeof fid_ver(src));
311 LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src));
312 dst->f_seq = cpu_to_le64(fid_seq(src));
313 dst->f_oid = cpu_to_le32(fid_oid(src));
314 dst->f_ver = cpu_to_le32(fid_ver(src));
316 EXPORT_SYMBOL(fid_cpu_to_le);
318 void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
320 /* check that all fields are converted */
321 CLASSERT(sizeof *src ==
322 sizeof fid_seq(src) +
323 sizeof fid_oid(src) + sizeof fid_ver(src));
324 dst->f_seq = le64_to_cpu(fid_seq(src));
325 dst->f_oid = le32_to_cpu(fid_oid(src));
326 dst->f_ver = le32_to_cpu(fid_ver(src));
327 LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
329 EXPORT_SYMBOL(fid_le_to_cpu);
331 void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq_range *src)
333 /* check that all fields are converted */
334 CLASSERT(sizeof(*src) ==
335 sizeof(src->lsr_start) +
336 sizeof(src->lsr_end) +
337 sizeof(src->lsr_mdt) +
338 sizeof(src->lsr_padding));
339 dst->lsr_start = cpu_to_le64(src->lsr_start);
340 dst->lsr_end = cpu_to_le64(src->lsr_end);
342 EXPORT_SYMBOL(range_cpu_to_le);
344 void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
346 /* check that all fields are converted */
347 CLASSERT(sizeof(*src) ==
348 sizeof(src->lsr_start) +
349 sizeof(src->lsr_end) +
350 sizeof(src->lsr_mdt) +
351 sizeof(src->lsr_padding));
353 dst->lsr_start = le64_to_cpu(src->lsr_start);
354 dst->lsr_end = le64_to_cpu(src->lsr_end);
356 EXPORT_SYMBOL(range_le_to_cpu);
358 void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
360 /* check that all fields are converted */
361 CLASSERT(sizeof(*src) ==
362 sizeof(src->lsr_start) +
363 sizeof(src->lsr_end) +
364 sizeof(src->lsr_mdt) +
365 sizeof(src->lsr_padding));
367 dst->lsr_start = cpu_to_be64(src->lsr_start);
368 dst->lsr_end = cpu_to_be64(src->lsr_end);
370 EXPORT_SYMBOL(range_cpu_to_be);
372 void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
374 /* check that all fields are converted */
375 CLASSERT(sizeof(*src) ==
376 sizeof(src->lsr_start) +
377 sizeof(src->lsr_end) +
378 sizeof(src->lsr_mdt) +
379 sizeof(src->lsr_padding));
381 dst->lsr_start = be64_to_cpu(src->lsr_start);
382 dst->lsr_end = be64_to_cpu(src->lsr_end);
384 EXPORT_SYMBOL(range_be_to_cpu);
387 * Build (DLM) resource name from fid.
390 fid_build_reg_res_name(const struct lu_fid *f, struct ldlm_res_id *name)
392 memset(name, 0, sizeof *name);
393 name->name[LUSTRE_RES_ID_SEQ_OFF] = fid_seq(f);
394 name->name[LUSTRE_RES_ID_OID_OFF] = fid_oid(f);
396 name->name[LUSTRE_RES_ID_VER_OFF] = fid_ver(f);
399 EXPORT_SYMBOL(fid_build_reg_res_name);
402 * Return true if resource is for object identified by fid.
404 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
408 ret = name->name[LUSTRE_RES_ID_SEQ_OFF] == fid_seq(f) &&
409 name->name[LUSTRE_RES_ID_OID_OFF] == fid_oid(f);
411 ret = ret && name->name[LUSTRE_RES_ID_VER_OFF] == fid_ver(f);
414 EXPORT_SYMBOL(fid_res_name_eq);