Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fid/fid_request.c
5  *  Lustre Sequence Manager
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_FID
33
34 #ifdef __KERNEL__
35 # include <libcfs/libcfs.h>
36 # include <linux/module.h>
37 #else /* __KERNEL__ */
38 # include <liblustre.h>
39 #endif
40
41 #include <obd.h>
42 #include <obd_class.h>
43 #include <dt_object.h>
44 #include <md_object.h>
45 #include <obd_support.h>
46 #include <lustre_req_layout.h>
47 #include <lustre_fid.h>
48 /* mdc RPC locks */
49 #include <lustre_mdc.h>
50 #include "fid_internal.h"
51
52 static int seq_client_rpc(struct lu_client_seq *seq, struct lu_range *input,
53                           struct lu_range *output, __u32 opc,
54                           const char *opcname)
55 {
56         int rc, size[3] = { sizeof(struct ptlrpc_body),
57                             sizeof(__u32),
58                             sizeof(struct lu_range) };
59         struct obd_export *exp = seq->lcs_exp;
60         struct ptlrpc_request *req;
61         struct lu_range *out, *in;
62         struct req_capsule pill;
63         __u32 *op;
64         ENTRY;
65
66         req = ptlrpc_prep_req(class_exp2cliimp(exp), LUSTRE_MDS_VERSION,
67                               SEQ_QUERY, 3, size, NULL);
68         if (req == NULL)
69                 RETURN(-ENOMEM);
70
71         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
72         req_capsule_set(&pill, &RQF_SEQ_QUERY);
73
74         /* Init operation code */
75         op = req_capsule_client_get(&pill, &RMF_SEQ_OPC);
76         *op = opc;
77
78         /* Zero out input range, this is not recovery yet. */
79         in = req_capsule_client_get(&pill, &RMF_SEQ_RANGE);
80         if (input != NULL)
81                 *in = *input;
82         else
83                 range_zero(in);
84
85         size[1] = sizeof(struct lu_range);
86         ptlrpc_req_set_repsize(req, 2, size);
87
88         if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
89                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
90                         SEQ_CONTROLLER_PORTAL : SEQ_METADATA_PORTAL;
91         } else {
92                 req->rq_request_portal = (opc == SEQ_ALLOC_SUPER) ?
93                         SEQ_CONTROLLER_PORTAL : SEQ_DATA_PORTAL;
94         }
95
96         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
97         rc = ptlrpc_queue_wait(req);
98         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
99
100         if (rc)
101                 GOTO(out_req, rc);
102
103         out = req_capsule_server_get(&pill, &RMF_SEQ_RANGE);
104         *output = *out;
105
106         if (!range_is_sane(output)) {
107                 CERROR("%s: Invalid range received from server: "
108                        DRANGE"\n", seq->lcs_name, PRANGE(output));
109                 GOTO(out_req, rc = -EINVAL);
110         }
111
112         if (range_is_exhausted(output)) {
113                 CERROR("%s: Range received from server is exhausted: "
114                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
115                 GOTO(out_req, rc = -EINVAL);
116         }
117         *in = *out;
118
119         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
120                seq->lcs_name, opcname, PRANGE(output));
121
122         EXIT;
123 out_req:
124         req_capsule_fini(&pill);
125         ptlrpc_req_finished(req);
126         return rc;
127 }
128
129 /* Request sequence-controller node to allocate new super-sequence. */
130 int seq_client_replay_super(struct lu_client_seq *seq,
131                             struct lu_range *range,
132                             const struct lu_env *env)
133 {
134         int rc;
135         ENTRY;
136
137         down(&seq->lcs_sem);
138
139 #ifdef __KERNEL__
140         if (seq->lcs_srv) {
141                 LASSERT(env != NULL);
142                 rc = seq_server_alloc_super(seq->lcs_srv, range,
143                                             &seq->lcs_space, env);
144         } else {
145 #endif
146                 rc = seq_client_rpc(seq, range, &seq->lcs_space,
147                                     SEQ_ALLOC_SUPER, "super");
148 #ifdef __KERNEL__
149         }
150 #endif
151         up(&seq->lcs_sem);
152         RETURN(rc);
153 }
154
155 /* Request sequence-controller node to allocate new super-sequence. */
156 int seq_client_alloc_super(struct lu_client_seq *seq,
157                            const struct lu_env *env)
158 {
159         ENTRY;
160         RETURN(seq_client_replay_super(seq, NULL, env));
161 }
162
163 /* Request sequence-controller node to allocate new meta-sequence. */
164 static int seq_client_alloc_meta(struct lu_client_seq *seq,
165                                  const struct lu_env *env)
166 {
167         int rc;
168         ENTRY;
169
170 #ifdef __KERNEL__
171         if (seq->lcs_srv) {
172                 LASSERT(env != NULL);
173                 rc = seq_server_alloc_meta(seq->lcs_srv, NULL,
174                                            &seq->lcs_space, env);
175         } else {
176 #endif
177                 rc = seq_client_rpc(seq, NULL, &seq->lcs_space,
178                                     SEQ_ALLOC_META, "meta");
179 #ifdef __KERNEL__
180         }
181 #endif
182         RETURN(rc);
183 }
184
185 /* Allocate new sequence for client. */
186 static int seq_client_alloc_seq(struct lu_client_seq *seq, seqno_t *seqnr)
187 {
188         int rc;
189         ENTRY;
190
191         LASSERT(range_is_sane(&seq->lcs_space));
192
193         if (range_is_exhausted(&seq->lcs_space)) {
194                 rc = seq_client_alloc_meta(seq, NULL);
195                 if (rc) {
196                         CERROR("%s: Can't allocate new meta-sequence, "
197                                "rc %d\n", seq->lcs_name, rc);
198                         RETURN(rc);
199                 } else {
200                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
201                                seq->lcs_name, PRANGE(&seq->lcs_space));
202                 }
203         } else {
204                 rc = 0;
205         }
206
207         LASSERT(!range_is_exhausted(&seq->lcs_space));
208         *seqnr = seq->lcs_space.lr_start;
209         seq->lcs_space.lr_start += 1;
210
211         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
212                *seqnr);
213
214         RETURN(rc);
215 }
216
217 /* Allocate new fid on passed client @seq and save it to @fid. */
218 int seq_client_alloc_fid(struct lu_client_seq *seq, struct lu_fid *fid)
219 {
220         int rc;
221         ENTRY;
222
223         LASSERT(seq != NULL);
224         LASSERT(fid != NULL);
225
226         down(&seq->lcs_sem);
227
228         if (fid_is_zero(&seq->lcs_fid) ||
229             fid_oid(&seq->lcs_fid) >= seq->lcs_width)
230         {
231                 seqno_t seqnr;
232
233                 rc = seq_client_alloc_seq(seq, &seqnr);
234                 if (rc) {
235                         CERROR("%s: Can't allocate new sequence, "
236                                "rc %d\n", seq->lcs_name, rc);
237                         up(&seq->lcs_sem);
238                         RETURN(rc);
239                 }
240
241                 CDEBUG(D_INFO, "%s: Switch to sequence "
242                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
243
244                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
245                 seq->lcs_fid.f_seq = seqnr;
246                 seq->lcs_fid.f_ver = 0;
247
248                 /*
249                  * Inform caller that sequence switch is performed to allow it
250                  * to setup FLD for it.
251                  */
252                 rc = 1;
253         } else {
254                 /* Just bump last allocated fid and return to caller. */
255                 seq->lcs_fid.f_oid += 1;
256                 rc = 0;
257         }
258
259         *fid = seq->lcs_fid;
260         up(&seq->lcs_sem);
261
262         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
263         RETURN(rc);
264 }
265 EXPORT_SYMBOL(seq_client_alloc_fid);
266
267 /*
268  * Finish the current sequence due to disconnect.
269  * See mdc_import_event()
270  */
271 void seq_client_flush(struct lu_client_seq *seq)
272 {
273         LASSERT(seq != NULL);
274         down(&seq->lcs_sem);
275         fid_zero(&seq->lcs_fid);
276         range_zero(&seq->lcs_space);
277         up(&seq->lcs_sem);
278 }
279 EXPORT_SYMBOL(seq_client_flush);
280
281 static void seq_client_proc_fini(struct lu_client_seq *seq);
282
283 #ifdef LPROCFS
284 static int seq_client_proc_init(struct lu_client_seq *seq)
285 {
286         int rc;
287         ENTRY;
288
289         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
290                                              seq_type_proc_dir,
291                                              NULL, NULL);
292
293         if (IS_ERR(seq->lcs_proc_dir)) {
294                 CERROR("%s: LProcFS failed in seq-init\n",
295                        seq->lcs_name);
296                 rc = PTR_ERR(seq->lcs_proc_dir);
297                 RETURN(rc);
298         }
299
300         rc = lprocfs_add_vars(seq->lcs_proc_dir,
301                               seq_client_proc_list, seq);
302         if (rc) {
303                 CERROR("%s: Can't init sequence manager "
304                        "proc, rc %d\n", seq->lcs_name, rc);
305                 GOTO(out_cleanup, rc);
306         }
307
308         RETURN(0);
309
310 out_cleanup:
311         seq_client_proc_fini(seq);
312         return rc;
313 }
314
315 static void seq_client_proc_fini(struct lu_client_seq *seq)
316 {
317         ENTRY;
318         if (seq->lcs_proc_dir) {
319                 if (!IS_ERR(seq->lcs_proc_dir))
320                         lprocfs_remove(&seq->lcs_proc_dir);
321                 seq->lcs_proc_dir = NULL;
322         }
323         EXIT;
324 }
325 #else
326 static int seq_client_proc_init(struct lu_client_seq *seq)
327 {
328         return 0;
329 }
330
331 static void seq_client_proc_fini(struct lu_client_seq *seq)
332 {
333         return;
334 }
335 #endif
336
337 int seq_client_init(struct lu_client_seq *seq,
338                     struct obd_export *exp,
339                     enum lu_cli_type type,
340                     const char *prefix,
341                     struct lu_server_seq *srv)
342 {
343         int rc;
344         ENTRY;
345
346         LASSERT(seq != NULL);
347         LASSERT(prefix != NULL);
348
349         seq->lcs_exp = exp;
350         seq->lcs_srv = srv;
351         seq->lcs_type = type;
352         sema_init(&seq->lcs_sem, 1);
353         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
354
355         /* Make sure that things are clear before work is started. */
356         seq_client_flush(seq);
357
358         if (exp == NULL) {
359                 LASSERT(seq->lcs_srv != NULL);
360         } else {
361                 LASSERT(seq->lcs_exp != NULL);
362                 seq->lcs_exp = class_export_get(seq->lcs_exp);
363         }
364
365         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
366                  "cli-%s", prefix);
367
368         rc = seq_client_proc_init(seq);
369         if (rc)
370                 seq_client_fini(seq);
371         RETURN(rc);
372 }
373 EXPORT_SYMBOL(seq_client_init);
374
375 void seq_client_fini(struct lu_client_seq *seq)
376 {
377         ENTRY;
378
379         seq_client_proc_fini(seq);
380
381         if (seq->lcs_exp != NULL) {
382                 class_export_put(seq->lcs_exp);
383                 seq->lcs_exp = NULL;
384         }
385
386         seq->lcs_srv = NULL;
387         EXIT;
388 }
389 EXPORT_SYMBOL(seq_client_fini);