Whamcloud - gitweb
LU-812 compat: clean up mutex lock to use kernel mutex primitive
[fs/lustre-release.git] / lustre / fid / fid_request.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/fid/fid_request.c
39  *
40  * Lustre Sequence Manager
41  *
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48 #define DEBUG_SUBSYSTEM S_FID
49
50 #ifdef __KERNEL__
51 # include <libcfs/libcfs.h>
52 # include <linux/module.h>
53 #else /* __KERNEL__ */
54 # include <liblustre.h>
55 #endif
56
57 #include <obd.h>
58 #include <obd_class.h>
59 #include <dt_object.h>
60 #include <md_object.h>
61 #include <obd_support.h>
62 #include <lustre_req_layout.h>
63 #include <lustre_fid.h>
64 /* mdc RPC locks */
65 #include <lustre_mdc.h>
66 #include "fid_internal.h"
67
68 static int seq_client_rpc(struct lu_client_seq *seq,
69                           struct lu_seq_range *output, __u32 opc,
70                           const char *opcname)
71 {
72         struct obd_export     *exp = seq->lcs_exp;
73         struct ptlrpc_request *req;
74         struct lu_seq_range   *out, *in;
75         __u32                 *op;
76         int                    rc;
77         ENTRY;
78
79         req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), &RQF_SEQ_QUERY,
80                                         LUSTRE_MDS_VERSION, SEQ_QUERY);
81         if (req == NULL)
82                 RETURN(-ENOMEM);
83
84         /* Init operation code */
85         op = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_OPC);
86         *op = opc;
87
88         /* Zero out input range, this is not recovery yet. */
89         in = req_capsule_client_get(&req->rq_pill, &RMF_SEQ_RANGE);
90         range_init(in);
91
92         ptlrpc_request_set_replen(req);
93
94        if (seq->lcs_type == LUSTRE_SEQ_METADATA) {
95                 req->rq_request_portal = SEQ_METADATA_PORTAL;
96                 in->lsr_flags = LU_SEQ_RANGE_MDT;
97         } else {
98                 LASSERTF(seq->lcs_type == LUSTRE_SEQ_DATA,
99                          "unknown lcs_type %u\n", seq->lcs_type);
100                 req->rq_request_portal = SEQ_DATA_PORTAL;
101                 in->lsr_flags = LU_SEQ_RANGE_OST;
102         }
103
104         if (opc == SEQ_ALLOC_SUPER) {
105                 /* Update index field of *in, it is required for
106                  * FLD update on super sequence allocator node. */
107                 in->lsr_index = seq->lcs_space.lsr_index;
108                 req->rq_request_portal = SEQ_CONTROLLER_PORTAL;
109         } else {
110                 LASSERTF(opc == SEQ_ALLOC_META,
111                          "unknown opcode %u\n, opc", opc);
112         }
113
114         ptlrpc_at_set_req_timeout(req);
115
116         mdc_get_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
117         rc = ptlrpc_queue_wait(req);
118         mdc_put_rpc_lock(exp->exp_obd->u.cli.cl_rpc_lock, NULL);
119
120         if (rc)
121                 GOTO(out_req, rc);
122
123         out = req_capsule_server_get(&req->rq_pill, &RMF_SEQ_RANGE);
124         *output = *out;
125
126         if (!range_is_sane(output)) {
127                 CERROR("%s: Invalid range received from server: "
128                        DRANGE"\n", seq->lcs_name, PRANGE(output));
129                 GOTO(out_req, rc = -EINVAL);
130         }
131
132         if (range_is_exhausted(output)) {
133                 CERROR("%s: Range received from server is exhausted: "
134                        DRANGE"]\n", seq->lcs_name, PRANGE(output));
135                 GOTO(out_req, rc = -EINVAL);
136         }
137
138         CDEBUG(D_INFO, "%s: Allocated %s-sequence "DRANGE"]\n",
139                seq->lcs_name, opcname, PRANGE(output));
140
141         EXIT;
142 out_req:
143         ptlrpc_req_finished(req);
144         return rc;
145 }
146
147 /* Request sequence-controller node to allocate new super-sequence. */
148 int seq_client_alloc_super(struct lu_client_seq *seq,
149                            const struct lu_env *env)
150 {
151         int rc;
152         ENTRY;
153
154         cfs_mutex_lock(&seq->lcs_mutex);
155
156 #ifdef __KERNEL__
157         if (seq->lcs_srv) {
158                 LASSERT(env != NULL);
159                 rc = seq_server_alloc_super(seq->lcs_srv, &seq->lcs_space,
160                                             env);
161         } else {
162 #endif
163                 rc = seq_client_rpc(seq, &seq->lcs_space,
164                                     SEQ_ALLOC_SUPER, "super");
165 #ifdef __KERNEL__
166         }
167 #endif
168         cfs_mutex_unlock(&seq->lcs_mutex);
169         RETURN(rc);
170 }
171
172 /* Request sequence-controller node to allocate new meta-sequence. */
173 static int seq_client_alloc_meta(const struct lu_env *env,
174                                  struct lu_client_seq *seq)
175 {
176         int rc;
177         ENTRY;
178
179 #ifdef __KERNEL__
180         if (seq->lcs_srv) {
181                 LASSERT(env != NULL);
182                 rc = seq_server_alloc_meta(seq->lcs_srv, &seq->lcs_space, env);
183         } else {
184 #endif
185                 rc = seq_client_rpc(seq, &seq->lcs_space,
186                                     SEQ_ALLOC_META, "meta");
187 #ifdef __KERNEL__
188         }
189 #endif
190         RETURN(rc);
191 }
192
193 /* Allocate new sequence for client. */
194 static int seq_client_alloc_seq(const struct lu_env *env,
195                                 struct lu_client_seq *seq, seqno_t *seqnr)
196 {
197         int rc;
198         ENTRY;
199
200         LASSERT(range_is_sane(&seq->lcs_space));
201
202         if (range_is_exhausted(&seq->lcs_space)) {
203                 rc = seq_client_alloc_meta(env, seq);
204                 if (rc) {
205                         CERROR("%s: Can't allocate new meta-sequence,"
206                                "rc %d\n", seq->lcs_name, rc);
207                         RETURN(rc);
208                 } else {
209                         CDEBUG(D_INFO, "%s: New range - "DRANGE"\n",
210                                seq->lcs_name, PRANGE(&seq->lcs_space));
211                 }
212         } else {
213                 rc = 0;
214         }
215
216         LASSERT(!range_is_exhausted(&seq->lcs_space));
217         *seqnr = seq->lcs_space.lsr_start;
218         seq->lcs_space.lsr_start += 1;
219
220         CDEBUG(D_INFO, "%s: Allocated sequence ["LPX64"]\n", seq->lcs_name,
221                *seqnr);
222
223         RETURN(rc);
224 }
225
226 static int seq_fid_alloc_prep(struct lu_client_seq *seq,
227                               cfs_waitlink_t *link)
228 {
229         if (seq->lcs_update) {
230                 cfs_waitq_add(&seq->lcs_waitq, link);
231                 cfs_set_current_state(CFS_TASK_UNINT);
232                 cfs_mutex_unlock(&seq->lcs_mutex);
233
234                 cfs_waitq_wait(link, CFS_TASK_UNINT);
235
236                 cfs_mutex_lock(&seq->lcs_mutex);
237                 cfs_waitq_del(&seq->lcs_waitq, link);
238                 cfs_set_current_state(CFS_TASK_RUNNING);
239                 return -EAGAIN;
240         }
241         ++seq->lcs_update;
242         cfs_mutex_unlock(&seq->lcs_mutex);
243         return 0;
244 }
245
246 static void seq_fid_alloc_fini(struct lu_client_seq *seq)
247 {
248         LASSERT(seq->lcs_update == 1);
249         cfs_mutex_lock(&seq->lcs_mutex);
250         --seq->lcs_update;
251         cfs_waitq_signal(&seq->lcs_waitq);
252 }
253
254 /* Allocate the whole seq to the caller*/
255 int seq_client_get_seq(const struct lu_env *env,
256                        struct lu_client_seq *seq, seqno_t *seqnr)
257 {
258         cfs_waitlink_t link;
259         int rc;
260
261         LASSERT(seqnr != NULL);
262         cfs_mutex_lock(&seq->lcs_mutex);
263         cfs_waitlink_init(&link);
264
265         while (1) {
266                 rc = seq_fid_alloc_prep(seq, &link);
267                 if (rc == 0)
268                         break;
269         }
270
271         rc = seq_client_alloc_seq(env, seq, seqnr);
272         if (rc) {
273                 CERROR("%s: Can't allocate new sequence, "
274                        "rc %d\n", seq->lcs_name, rc);
275                 seq_fid_alloc_fini(seq);
276                 cfs_mutex_unlock(&seq->lcs_mutex);
277                 return rc;
278         }
279
280         CDEBUG(D_INFO, "%s: allocate sequence "
281                "[0x%16.16"LPF64"x]\n", seq->lcs_name, *seqnr);
282
283         /*Since the caller require the whole seq,
284          *so marked this seq to be used*/
285         seq->lcs_fid.f_oid = LUSTRE_SEQ_MAX_WIDTH;
286         seq->lcs_fid.f_seq = *seqnr;
287         seq->lcs_fid.f_ver = 0;
288
289         /*
290          * Inform caller that sequence switch is performed to allow it
291          * to setup FLD for it.
292          */
293         seq_fid_alloc_fini(seq);
294         cfs_mutex_unlock(&seq->lcs_mutex);
295
296         return rc;
297 }
298 EXPORT_SYMBOL(seq_client_get_seq);
299
300 /* Allocate new fid on passed client @seq and save it to @fid. */
301 int seq_client_alloc_fid(const struct lu_env *env,
302                          struct lu_client_seq *seq, struct lu_fid *fid)
303 {
304         cfs_waitlink_t link;
305         int rc;
306         ENTRY;
307
308         LASSERT(seq != NULL);
309         LASSERT(fid != NULL);
310
311         cfs_waitlink_init(&link);
312         cfs_mutex_lock(&seq->lcs_mutex);
313
314         while (1) {
315                 seqno_t seqnr;
316
317                 if (!fid_is_zero(&seq->lcs_fid) &&
318                     fid_oid(&seq->lcs_fid) < seq->lcs_width) {
319                         /* Just bump last allocated fid and return to caller. */
320                         seq->lcs_fid.f_oid += 1;
321                         rc = 0;
322                         break;
323                 }
324
325                 rc = seq_fid_alloc_prep(seq, &link);
326                 if (rc)
327                         continue;
328
329                 rc = seq_client_alloc_seq(env, seq, &seqnr);
330                 if (rc) {
331                         CERROR("%s: Can't allocate new sequence, "
332                                "rc %d\n", seq->lcs_name, rc);
333                         seq_fid_alloc_fini(seq);
334                         cfs_mutex_unlock(&seq->lcs_mutex);
335                         RETURN(rc);
336                 }
337
338                 CDEBUG(D_INFO, "%s: Switch to sequence "
339                        "[0x%16.16"LPF64"x]\n", seq->lcs_name, seqnr);
340
341                 seq->lcs_fid.f_oid = LUSTRE_FID_INIT_OID;
342                 seq->lcs_fid.f_seq = seqnr;
343                 seq->lcs_fid.f_ver = 0;
344
345                 /*
346                  * Inform caller that sequence switch is performed to allow it
347                  * to setup FLD for it.
348                  */
349                 rc = 1;
350
351                 seq_fid_alloc_fini(seq);
352                 break;
353         }
354
355         *fid = seq->lcs_fid;
356         cfs_mutex_unlock(&seq->lcs_mutex);
357
358         CDEBUG(D_INFO, "%s: Allocated FID "DFID"\n", seq->lcs_name,  PFID(fid));
359         RETURN(rc);
360 }
361 EXPORT_SYMBOL(seq_client_alloc_fid);
362
363 /*
364  * Finish the current sequence due to disconnect.
365  * See mdc_import_event()
366  */
367 void seq_client_flush(struct lu_client_seq *seq)
368 {
369         cfs_waitlink_t link;
370
371         LASSERT(seq != NULL);
372         cfs_waitlink_init(&link);
373         cfs_mutex_lock(&seq->lcs_mutex);
374
375         while (seq->lcs_update) {
376                 cfs_waitq_add(&seq->lcs_waitq, &link);
377                 cfs_set_current_state(CFS_TASK_UNINT);
378                 cfs_mutex_unlock(&seq->lcs_mutex);
379
380                 cfs_waitq_wait(&link, CFS_TASK_UNINT);
381
382                 cfs_mutex_lock(&seq->lcs_mutex);
383                 cfs_waitq_del(&seq->lcs_waitq, &link);
384                 cfs_set_current_state(CFS_TASK_RUNNING);
385         }
386
387         fid_zero(&seq->lcs_fid);
388         /**
389          * this id shld not be used for seq range allocation.
390          * set to -1 for dgb check.
391          */
392
393         seq->lcs_space.lsr_index = -1;
394
395         range_init(&seq->lcs_space);
396         cfs_mutex_unlock(&seq->lcs_mutex);
397 }
398 EXPORT_SYMBOL(seq_client_flush);
399
400 static void seq_client_proc_fini(struct lu_client_seq *seq);
401
402 #ifdef LPROCFS
403 static int seq_client_proc_init(struct lu_client_seq *seq)
404 {
405         int rc;
406         ENTRY;
407
408         seq->lcs_proc_dir = lprocfs_register(seq->lcs_name,
409                                              seq_type_proc_dir,
410                                              NULL, NULL);
411
412         if (IS_ERR(seq->lcs_proc_dir)) {
413                 CERROR("%s: LProcFS failed in seq-init\n",
414                        seq->lcs_name);
415                 rc = PTR_ERR(seq->lcs_proc_dir);
416                 RETURN(rc);
417         }
418
419         rc = lprocfs_add_vars(seq->lcs_proc_dir,
420                               seq_client_proc_list, seq);
421         if (rc) {
422                 CERROR("%s: Can't init sequence manager "
423                        "proc, rc %d\n", seq->lcs_name, rc);
424                 GOTO(out_cleanup, rc);
425         }
426
427         RETURN(0);
428
429 out_cleanup:
430         seq_client_proc_fini(seq);
431         return rc;
432 }
433
434 static void seq_client_proc_fini(struct lu_client_seq *seq)
435 {
436         ENTRY;
437         if (seq->lcs_proc_dir) {
438                 if (!IS_ERR(seq->lcs_proc_dir))
439                         lprocfs_remove(&seq->lcs_proc_dir);
440                 seq->lcs_proc_dir = NULL;
441         }
442         EXIT;
443 }
444 #else
445 static int seq_client_proc_init(struct lu_client_seq *seq)
446 {
447         return 0;
448 }
449
450 static void seq_client_proc_fini(struct lu_client_seq *seq)
451 {
452         return;
453 }
454 #endif
455
456 int seq_client_init(struct lu_client_seq *seq,
457                     struct obd_export *exp,
458                     enum lu_cli_type type,
459                     const char *prefix,
460                     struct lu_server_seq *srv)
461 {
462         int rc;
463         ENTRY;
464
465         LASSERT(seq != NULL);
466         LASSERT(prefix != NULL);
467
468         seq->lcs_exp = exp;
469         seq->lcs_srv = srv;
470         seq->lcs_type = type;
471         cfs_mutex_init(&seq->lcs_mutex);
472         seq->lcs_width = LUSTRE_SEQ_MAX_WIDTH;
473         cfs_waitq_init(&seq->lcs_waitq);
474
475         /* Make sure that things are clear before work is started. */
476         seq_client_flush(seq);
477
478         if (exp == NULL) {
479                 LASSERT(seq->lcs_srv != NULL);
480         } else {
481                 LASSERT(seq->lcs_exp != NULL);
482                 seq->lcs_exp = class_export_get(seq->lcs_exp);
483         }
484
485         snprintf(seq->lcs_name, sizeof(seq->lcs_name),
486                  "cli-%s", prefix);
487
488         rc = seq_client_proc_init(seq);
489         if (rc)
490                 seq_client_fini(seq);
491         RETURN(rc);
492 }
493 EXPORT_SYMBOL(seq_client_init);
494
495 void seq_client_fini(struct lu_client_seq *seq)
496 {
497         ENTRY;
498
499         seq_client_proc_fini(seq);
500
501         if (seq->lcs_exp != NULL) {
502                 class_export_put(seq->lcs_exp);
503                 seq->lcs_exp = NULL;
504         }
505
506         seq->lcs_srv = NULL;
507         EXIT;
508 }
509 EXPORT_SYMBOL(seq_client_fini);