Whamcloud - gitweb
- implemented almost all Nikita's notes in seq mgr and same ones in fld;
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_request.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 static int fld_rrb_hash(struct lu_client_fld *fld,
56                         seqno_t seq)
57 {
58         if (fld->fld_count == 0)
59                 return 0;
60
61         return do_div(seq, fld->fld_count);
62 }
63
64 static int fld_dht_hash(struct lu_client_fld *fld,
65                         seqno_t seq)
66 {
67         /* XXX: here should be DHT hash */
68         return fld_rrb_hash(fld, seq);
69 }
70
71 struct lu_fld_hash fld_hash[3] = {
72         {
73                 .fh_name = "DHT",
74                 .fh_func = fld_dht_hash
75         },
76         {
77                 .fh_name = "Round Robin",
78                 .fh_func = fld_rrb_hash
79         },
80         {
81                 0,
82         }
83 };
84
85 /* this function makes decision if passed @target appropriate acoordingly to
86  * passed @hash. In case of usual round-robin hash, this is decided by comparing
87  * hash and target's index. In the case of DHT, algorithm is a bit more
88  * complicated. */
89 static int fld_client_apt_target(struct fld_target *target,
90                                  int hash)
91 {
92         /* XXX: DHT case should be worked out. */
93         return (target->fldt_idx == hash);
94 }
95
96 static struct fld_target *
97 fld_client_get_target(struct lu_client_fld *fld,
98                       seqno_t seq)
99 {
100         struct fld_target *target;
101         int hash;
102         ENTRY;
103
104         LASSERT(fld->fld_hash != NULL);
105
106         spin_lock(&fld->fld_lock);
107         hash = fld->fld_hash->fh_func(fld, seq);
108
109         list_for_each_entry(target,
110                             &fld->fld_targets, fldt_chain) {
111                 if (fld_client_apt_target(target, hash)) {
112                         spin_unlock(&fld->fld_lock);
113                         RETURN(target);
114                 }
115         }
116         spin_unlock(&fld->fld_lock);
117
118         /* if target is not found, there is logical error anyway, so here is
119          * LBUG() to catch that situation. */
120         LBUG();
121         RETURN(NULL);
122 }
123
124 /* add export to FLD. This is usually done by CMM and LMV as they are main users
125  * of FLD module. */
126 int fld_client_add_target(struct lu_client_fld *fld,
127                           struct obd_export *exp)
128 {
129         struct client_obd *cli = &exp->exp_obd->u.cli;
130         struct fld_target *target, *tmp;
131         ENTRY;
132
133         LASSERT(exp != NULL);
134
135         CDEBUG(D_INFO|D_WARNING, "%s: adding export %s\n",
136                fld->fld_name, cli->cl_target_uuid.uuid);
137
138         OBD_ALLOC_PTR(target);
139         if (target == NULL)
140                 RETURN(-ENOMEM);
141
142         spin_lock(&fld->fld_lock);
143         list_for_each_entry(tmp, &fld->fld_targets, fldt_chain) {
144                 if (obd_uuid_equals(&tmp->fldt_exp->exp_client_uuid,
145                                     &exp->exp_client_uuid))
146                 {
147                         spin_unlock(&fld->fld_lock);
148                         OBD_FREE_PTR(target);
149                         RETURN(-EEXIST);
150                 }
151         }
152
153         target->fldt_exp = class_export_get(exp);
154         target->fldt_idx = fld->fld_count;
155
156         list_add_tail(&target->fldt_chain,
157                       &fld->fld_targets);
158         fld->fld_count++;
159         spin_unlock(&fld->fld_lock);
160
161         RETURN(0);
162 }
163 EXPORT_SYMBOL(fld_client_add_target);
164
165 /* remove export from FLD */
166 int fld_client_del_target(struct lu_client_fld *fld,
167                           struct obd_export *exp)
168 {
169         struct fld_target *target, *tmp;
170         ENTRY;
171
172         spin_lock(&fld->fld_lock);
173         list_for_each_entry_safe(target, tmp,
174                                  &fld->fld_targets, fldt_chain) {
175                 if (obd_uuid_equals(&target->fldt_exp->exp_client_uuid,
176                                     &exp->exp_client_uuid))
177                 {
178                         fld->fld_count--;
179                         list_del(&target->fldt_chain);
180                         spin_unlock(&fld->fld_lock);
181                         class_export_put(target->fldt_exp);
182                         OBD_FREE_PTR(target);
183                         RETURN(0);
184                 }
185         }
186         spin_unlock(&fld->fld_lock);
187         RETURN(-ENOENT);
188 }
189 EXPORT_SYMBOL(fld_client_del_target);
190
191 #ifdef LPROCFS
192 static int fld_client_proc_init(struct lu_client_fld *fld)
193 {
194         int rc;
195         ENTRY;
196
197         fld->fld_proc_dir = lprocfs_register(fld->fld_name,
198                                              proc_lustre_root,
199                                              NULL, NULL);
200
201         if (IS_ERR(fld->fld_proc_dir)) {
202                 CERROR("LProcFS failed in fld-init\n");
203                 rc = PTR_ERR(fld->fld_proc_dir);
204                 GOTO(err, rc);
205         }
206
207         rc = lprocfs_add_vars(fld->fld_proc_dir,
208                               fld_client_proc_list, fld);
209         if (rc) {
210                 CERROR("can't init FLD "
211                        "proc, rc %d\n", rc);
212                 GOTO(err_dir, rc);
213         }
214
215         RETURN(0);
216
217 err_dir:
218         lprocfs_remove(fld->fld_proc_dir);
219 err:
220         fld->fld_proc_dir = NULL;
221         return rc;
222 }
223
224 static void fld_client_proc_fini(struct lu_client_fld *fld)
225 {
226         ENTRY;
227         if (fld->fld_proc_dir) {
228                 lprocfs_remove(fld->fld_proc_dir);
229                 fld->fld_proc_dir = NULL;
230         }
231         EXIT;
232 }
233 #else
234 static int fld_client_proc_init(struct lu_client_fld *fld)
235 {
236         return 0;
237 }
238
239 static void fld_client_proc_fini(struct lu_client_fld *fld)
240 {
241         return;
242 }
243 #endif
244
245 static inline int hash_is_sane(int hash)
246 {
247         return (hash >= 0 && hash < ARRAY_SIZE(fld_hash));
248 }
249
250 int fld_client_init(struct lu_client_fld *fld,
251                     const char *uuid, int hash)
252 {
253         int rc = 0;
254         ENTRY;
255
256         LASSERT(fld != NULL);
257
258         if (!hash_is_sane(hash)) {
259                 CERROR("wrong hash function 0x%x\n", hash);
260                 RETURN(-EINVAL);
261         }
262
263         INIT_LIST_HEAD(&fld->fld_targets);
264         spin_lock_init(&fld->fld_lock);
265         fld->fld_hash = &fld_hash[hash];
266         fld->fld_count = 0;
267
268         snprintf(fld->fld_name, sizeof(fld->fld_name),
269                  "%s-cli-%s", LUSTRE_FLD_NAME, uuid);
270
271 #ifdef __KERNEL__
272         fld->fld_cache = fld_cache_init(FLD_HTABLE_SIZE);
273         if (IS_ERR(fld->fld_cache)) {
274                 rc = PTR_ERR(fld->fld_cache);
275                 fld->fld_cache = NULL;
276                 GOTO(out, rc);
277         }
278 #endif
279
280         rc = fld_client_proc_init(fld);
281         if (rc)
282                 GOTO(out, rc);
283         EXIT;
284 out:
285         if (rc)
286                 fld_client_fini(fld);
287         else
288                 CDEBUG(D_INFO|D_WARNING,
289                        "Client FLD, using \"%s\" hash\n",
290                        fld->fld_hash->fh_name);
291         return rc;
292 }
293 EXPORT_SYMBOL(fld_client_init);
294
295 void fld_client_fini(struct lu_client_fld *fld)
296 {
297         struct fld_target *target, *tmp;
298         ENTRY;
299
300         fld_client_proc_fini(fld);
301
302         spin_lock(&fld->fld_lock);
303         list_for_each_entry_safe(target, tmp,
304                                  &fld->fld_targets, fldt_chain) {
305                 fld->fld_count--;
306                 list_del(&target->fldt_chain);
307                 class_export_put(target->fldt_exp);
308                 OBD_FREE_PTR(target);
309         }
310         spin_unlock(&fld->fld_lock);
311
312 #ifdef __KERNEL__
313         if (fld->fld_cache != NULL) {
314                 fld_cache_fini(fld->fld_cache);
315                 fld->fld_cache = NULL;
316         }
317 #endif
318
319         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
320         EXIT;
321 }
322 EXPORT_SYMBOL(fld_client_fini);
323
324 static int fld_client_rpc(struct obd_export *exp,
325                           struct md_fld *mf, __u32 fld_op)
326 {
327         int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
328         int mf_size = sizeof(struct md_fld);
329         struct ptlrpc_request *req;
330         struct req_capsule pill;
331         struct md_fld *pmf;
332         __u32 *op;
333         ENTRY;
334
335         LASSERT(exp != NULL);
336
337         req = ptlrpc_prep_req(class_exp2cliimp(exp),
338                               LUSTRE_MDS_VERSION, FLD_QUERY,
339                               2, size, NULL);
340         if (req == NULL)
341                 RETURN(-ENOMEM);
342
343         req_capsule_init(&pill, req, RCL_CLIENT, NULL);
344
345         req_capsule_set(&pill, &RQF_FLD_QUERY);
346
347         op = req_capsule_client_get(&pill, &RMF_FLD_OPC);
348         *op = fld_op;
349
350         pmf = req_capsule_client_get(&pill, &RMF_FLD_MDFLD);
351         *pmf = *mf;
352
353         req->rq_replen = lustre_msg_size(1, &mf_size);
354         req->rq_request_portal = FLD_REQUEST_PORTAL;
355
356         rc = ptlrpc_queue_wait(req);
357         if (rc)
358                 GOTO(out_req, rc);
359
360         pmf = req_capsule_server_get(&pill, &RMF_FLD_MDFLD);
361         if (pmf == NULL)
362                 GOTO(out_req, rc = -EFAULT);
363         *mf = *pmf;
364         EXIT;
365 out_req:
366         req_capsule_fini(&pill);
367         ptlrpc_req_finished(req);
368         return rc;
369 }
370
371 static int __fld_client_create(struct lu_client_fld *fld,
372                                seqno_t seq, mdsno_t mds,
373                     struct md_fld *md_fld)
374 {
375         struct fld_target *target;
376         __u32 rc;
377         ENTRY;
378
379         target = fld_client_get_target(fld, seq);
380         if (!target)
381                 RETURN(-EINVAL);
382
383         rc = fld_client_rpc(target->fldt_exp, md_fld, FLD_CREATE);
384
385         if (rc  == 0) {
386                 /* do not return result of calling fld_cache_insert()
387                  * here. First of all because it may return -EEXISTS. Another
388                  * reason is that, we do not want to stop proceeding because of
389                  * cache errors. --umka */
390                 fld_cache_insert(fld->fld_cache, seq, mds);
391         }
392
393         RETURN(rc);
394 }
395
396 int fld_client_create(struct lu_client_fld *fld,
397                       seqno_t seq, mdsno_t mds)
398 {
399         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = mds };
400         __u32 rc;
401         ENTRY;
402
403         rc = __fld_client_create(fld, seq, mds, &md_fld);
404         RETURN(rc);
405 }
406 EXPORT_SYMBOL(fld_client_create);
407
408 static int __fld_client_delete(struct lu_client_fld *fld,
409                                seqno_t seq, struct md_fld *md_fld)
410 {
411         struct fld_target *target;
412         __u32 rc;
413
414         fld_cache_delete(fld->fld_cache, seq);
415
416         target = fld_client_get_target(fld, seq);
417         if (!target)
418                 RETURN(-EINVAL);
419
420         rc = fld_client_rpc(target->fldt_exp,
421                             md_fld, FLD_DELETE);
422         RETURN(rc);
423 }
424
425 int fld_client_delete(struct lu_client_fld *fld,
426                       seqno_t seq)
427 {
428         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
429         __u32 rc;
430
431         rc = __fld_client_delete(fld, seq, &md_fld);
432         RETURN(rc);
433 }
434 EXPORT_SYMBOL(fld_client_delete);
435
436 static int __fld_client_lookup(struct lu_client_fld *fld,
437                                seqno_t seq, mdsno_t *mds,
438                                struct md_fld *md_fld)
439 {
440         struct fld_target *target;
441         int rc;
442         ENTRY;
443
444         /* lookup it in the cache */
445         rc = fld_cache_lookup(fld->fld_cache, seq, mds);
446         if (rc == 0)
447                 RETURN(0);
448
449         /* can not find it in the cache */
450         target = fld_client_get_target(fld, seq);
451         if (!target)
452                 RETURN(-EINVAL);
453
454         rc = fld_client_rpc(target->fldt_exp,
455                             md_fld, FLD_LOOKUP);
456         if (rc == 0)
457                 *mds = md_fld->mf_mds;
458
459         /* do not return error here as well. See previous comment in same
460          * situation in function fld_client_create(). --umka */
461         fld_cache_insert(fld->fld_cache, seq, *mds);
462
463         RETURN(rc);
464 }
465
466 int fld_client_lookup(struct lu_client_fld *fld,
467                       seqno_t seq, mdsno_t *mds)
468 {
469         struct md_fld md_fld = { .mf_seq = seq, .mf_mds = 0 };
470         int rc;
471         ENTRY;
472
473         rc = __fld_client_lookup(fld, seq, mds, &md_fld);
474         RETURN(rc);
475 }
476 EXPORT_SYMBOL(fld_client_lookup);