Whamcloud - gitweb
Branch b1_8
[fs/lustre-release.git] / lustre / mdc / mdc_fid.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdc/mdc_fid.c
37  *
38  * MDC fid management
39  *
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46 #define DEBUG_SUBSYSTEM S_FID
47
48 #ifdef __KERNEL__
49 # include <libcfs/libcfs.h>
50 # include <linux/module.h>
51 #else /* __KERNEL__ */
52 # include <liblustre.h>
53 #endif
54
55 #include <obd.h>
56 #include <obd_class.h>
57 #include <obd_support.h>
58 #include "mdc_internal.h"
59
60 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_seq_range *input,
61                           struct lu_seq_range *output, __u32 opc,
62                           const char *opcname)
63 {
64         int rc;
65         __u32 size[3] = { sizeof(struct ptlrpc_body),
66                             sizeof(__u32),
67                             sizeof(struct lu_seq_range) };
68         struct obd_export *exp = seq->lcs_exp;
69         struct ptlrpc_request *req;
70         struct lu_seq_range *out, *in;
71         __u32 *op;
72         ENTRY;
73
74         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
75                               SEQ_QUERY, 3, size, NULL);
76         if (req == NULL)
77                 RETURN(-ENOMEM);
78
79         req->rq_export = class_export_get(exp);
80         op = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, sizeof(__u32));
81         *op = opc;
82
83         /* Zero out input range, this is not recovery yet. */
84         in = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1,
85                             sizeof(struct lu_seq_range));
86         if (input != NULL)
87                 *in = *input;
88         else
89                 range_init(in);
90
91         size[1] = sizeof(struct lu_seq_range);
92         ptlrpc_req_set_repsize(req, 2, size);
93
94         LASSERT(seq->lcs_type == LUSTRE_SEQ_METADATA);
95         req->rq_request_portal = SEQ_METADATA_PORTAL;
96
97         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
98         rc = ptlrpc_queue_wait(req);
99         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
100
101         if (rc)
102                 GOTO(out_req, rc);
103
104         out = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
105                             sizeof(struct lu_seq_range));
106         *output = *out;
107
108         if (!range_is_sane(output)) {
109                 CERROR("%s: Invalid range received from server: "
110                        DRANGE"\n", seq->lcs_name, PRANGE(output));
111                 GOTO(out_req, rc = -EINVAL);
112         }
113
114         if (range_is_exhausted(output)) {
115                 CERROR("%s: Range received from server is exhausted: "
116                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
117                 GOTO(out_req, rc = -EINVAL);
118         }
119         *in = *out;
120
121         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
122                seq->lcs_name, opcname, PRANGE(output));
123
124         EXIT;
125 out_req:
126         ptlrpc_req_finished(req);
127         return rc;
128 }
129
130
131 /* Request sequence-controller node to allocate new meta-sequence. */
132 static int seq_client_alloc_meta(struct lu_client_seq *seq)
133 {
134         int rc;
135         ENTRY;
136
137         rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
138                             SEQ_ALLOC_META, "meta");
139         RETURN(rc);
140 }
141
142 /* Allocate new sequence for client. */
143 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
144 {
145         int rc;
146         ENTRY;
147
148         LASSERT(range_is_sane(&seq->lcs_space));
149
150         if (range_is_exhausted(&seq->lcs_space)) {
151                 rc = seq_client_alloc_meta(seq);
152                 if (rc) {
153                         CERROR("%s: Can't allocate new meta-sequence, "
154                                "rc %d\n", seq->lcs_name, rc);
155                         RETURN(rc);
156                 } else {
157                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
158                                seq->lcs_name, PRANGE(&seq->lcs_space));
159                 }
160         } else {
161                 rc = 0;
162         }
163
164         LASSERT(!range_is_exhausted(&seq->lcs_space));
165         *seqnr = seq->lcs_space.lsr_start;
166         seq->lcs_space.lsr_start += 1;
167
168         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
169                *seqnr);
170
171         RETURN(rc);
172 }
173
174 /* Allocate new fid on passed client @seq and save it to @fid. */
175 static int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
176 {
177         int rc;
178         ENTRY;
179
180         LASSERT(seq != NULL);
181         LASSERT(fid != NULL);
182
183         down(&seq->lcs_sem);
184
185         if (fid_is_zero(&seq->lcs_fid) ||
186             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
187         {
188                 seqno_t seqnr;
189
190                 rc = seq_client_alloc_seq(seq, &seqnr);
191                 if (rc) {
192                         CERROR("%s: Can't allocate new sequence, "
193                                "rc %d\n", seq->lcs_name, rc);
194                         up(&seq->lcs_sem);
195                         RETURN(rc);
196                 }
197
198                 CDEBUG(D_INFO, "%s: Switch to sequence "
199                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
200
201                 seq->lcs_fid.f_seq = seqnr;
202                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
203                 seq->lcs_fid.f_ver = 0;
204
205                 /*
206                  * Inform caller that sequence switch is performed to allow it
207                  * to setup FLD for it.
208                  */
209                 rc = 1;
210         } else {
211                 /* Just bump last allocated fid and return to caller. */
212                 seq->lcs_fid.f_oid += 1;
213                 rc = 0;
214         }
215
216         *fid = seq->lcs_fid;
217         up(&seq->lcs_sem);
218
219         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
220         RETURN(rc);
221 }
222
223 /*
224  * Finish the current sequence due to disconnect.
225  * See mdc_import_event()
226  */
227 static void seq_client_flush(struct lu_client_seq *seq)
228 {
229         LASSERT(seq != NULL);
230         down(&seq->lcs_sem);
231         fid_init(&seq->lcs_fid);
232         range_init(&seq->lcs_space);
233         up(&seq->lcs_sem);
234 }
235
236 static int seq_client_proc_init(struct lu_client_seq *seq)
237 {
238         return 0;
239 }
240
241 static void seq_client_proc_fini(struct lu_client_seq *seq)
242 {
243         return;
244 }
245
246 int seq_client_init(struct lu_client_seq *seq,
247                     struct obd_export *exp,
248                     enum lu_cli_type type,
249                     __u64 width,
250                     const char *prefix)
251 {
252         int rc;
253         ENTRY;
254
255         LASSERT(seq != NULL);
256         LASSERT(prefix != NULL);
257
258         seq->lcs_exp = exp;
259         seq->lcs_type = type;
260         sema_init(&seq->lcs_sem, 1);
261         seq->lcs_width = width;
262
263         /* Make sure that things are clear before work is started. */
264         seq_client_flush(seq);
265
266         LASSERT(seq->lcs_exp != NULL);
267         seq->lcs_exp = class_export_get(seq->lcs_exp);
268
269         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
270                  "cli-%s", prefix);
271
272         rc = seq_client_proc_init(seq);
273         if (rc)
274                 seq_client_fini(seq);
275         RETURN(rc);
276 }
277
278 void seq_client_fini(struct lu_client_seq *seq)
279 {
280         ENTRY;
281
282         seq_client_proc_fini(seq);
283         LASSERT(seq->lcs_exp != NULL);
284
285         if (seq->lcs_exp != NULL) {
286                 class_export_put(seq->lcs_exp);
287                 seq->lcs_exp = NULL;
288         }
289
290         EXIT;
291 }
292
293 /* Allocate new fid on passed client @seq and save it to @fid. */
294 int mdc_fid_alloc(struct lu_client_seq *seq, struct lu_fid *fid)
295 {
296         int rc;
297         ENTRY;
298         
299         rc = seq_client_alloc_fid(seq, fid);
300         if (rc > 0)
301                 rc = 0;
302         RETURN(rc);
303 }
304
305 void fid_cpu_to_le(struct lu_fid *dst, const struct lu_fid *src)
306 {
307         /* check that all fields are converted */
308         CLASSERT(sizeof *src ==
309                  sizeof fid_seq(src) +
310                  sizeof fid_oid(src) + sizeof fid_ver(src));
311         LASSERTF(fid_is_igif(src) || fid_ver(src) == 0, DFID"\n", PFID(src));
312         dst->f_seq = cpu_to_le64(fid_seq(src));
313         dst->f_oid = cpu_to_le32(fid_oid(src));
314         dst->f_ver = cpu_to_le32(fid_ver(src));
315 }
316 EXPORT_SYMBOL(fid_cpu_to_le);
317
318 void fid_le_to_cpu(struct lu_fid *dst, const struct lu_fid *src)
319 {
320         /* check that all fields are converted */
321         CLASSERT(sizeof *src ==
322                  sizeof fid_seq(src) +
323                  sizeof fid_oid(src) + sizeof fid_ver(src));
324         dst->f_seq = le64_to_cpu(fid_seq(src));
325         dst->f_oid = le32_to_cpu(fid_oid(src));
326         dst->f_ver = le32_to_cpu(fid_ver(src));
327         LASSERTF(fid_is_igif(dst) || fid_ver(dst) == 0, DFID"\n", PFID(dst));
328 }
329 EXPORT_SYMBOL(fid_le_to_cpu);
330
331 void range_cpu_to_le(struct lu_seq_range *dst, const struct lu_seq_range *src)
332 {
333         /* check that all fields are converted */
334         CLASSERT(sizeof(*src) ==
335                  sizeof(src->lsr_start) +
336                  sizeof(src->lsr_end) +
337                  sizeof(src->lsr_mdt) +
338                  sizeof(src->lsr_padding));
339         dst->lsr_start = cpu_to_le64(src->lsr_start);
340         dst->lsr_end = cpu_to_le64(src->lsr_end);
341 }
342 EXPORT_SYMBOL(range_cpu_to_le);
343
344 void range_le_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
345 {
346         /* check that all fields are converted */
347         CLASSERT(sizeof(*src) ==
348                  sizeof(src->lsr_start) +
349                  sizeof(src->lsr_end) +
350                  sizeof(src->lsr_mdt) +
351                  sizeof(src->lsr_padding));
352
353         dst->lsr_start = le64_to_cpu(src->lsr_start);
354         dst->lsr_end = le64_to_cpu(src->lsr_end);
355 }
356 EXPORT_SYMBOL(range_le_to_cpu);
357
358 void range_cpu_to_be(struct lu_seq_range *dst, const struct lu_seq_range *src)
359 {
360         /* check that all fields are converted */
361         CLASSERT(sizeof(*src) ==
362                  sizeof(src->lsr_start) +
363                  sizeof(src->lsr_end) +
364                  sizeof(src->lsr_mdt) +
365                  sizeof(src->lsr_padding));
366
367         dst->lsr_start = cpu_to_be64(src->lsr_start);
368         dst->lsr_end = cpu_to_be64(src->lsr_end);
369 }
370 EXPORT_SYMBOL(range_cpu_to_be);
371
372 void range_be_to_cpu(struct lu_seq_range *dst, const struct lu_seq_range *src)
373 {
374         /* check that all fields are converted */
375         CLASSERT(sizeof(*src) ==
376                  sizeof(src->lsr_start) +
377                  sizeof(src->lsr_end) +
378                  sizeof(src->lsr_mdt) +
379                  sizeof(src->lsr_padding));
380
381         dst->lsr_start = be64_to_cpu(src->lsr_start);
382         dst->lsr_end = be64_to_cpu(src->lsr_end);
383 }
384 EXPORT_SYMBOL(range_be_to_cpu);
385
386 /**     
387  * Build (DLM) resource name from fid.
388  */
389 struct ldlm_res_id *
390 fid_build_reg_res_name(const struct lu_fid *f, struct ldlm_res_id *name)
391 {       
392         memset(name, 0, sizeof *name);
393         name->name[LUSTRE_RES_ID_SEQ_OFF] = fid_seq(f);
394         name->name[LUSTRE_RES_ID_OID_OFF] = fid_oid(f);
395         if (!fid_is_igif(f))
396                 name->name[LUSTRE_RES_ID_VER_OFF] = fid_ver(f);
397         return name;
398 }
399 EXPORT_SYMBOL(fid_build_reg_res_name);
400
401 /**
402  * Return true if resource is for object identified by fid.
403  */
404 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
405 {
406         int ret;
407         
408         ret = name->name[LUSTRE_RES_ID_SEQ_OFF] == fid_seq(f) &&
409               name->name[LUSTRE_RES_ID_OID_OFF] == fid_oid(f);
410         if (!fid_is_igif(f))
411                 ret = ret && name->name[LUSTRE_RES_ID_VER_OFF] == fid_ver(f);
412         return ret;
413 }
414 EXPORT_SYMBOL(fid_res_name_eq);