Whamcloud - gitweb
move mdsno_t into lustre_idl.h and use this type in code for mds number.
[fs/lustre-release.git] / lustre / fld / fld_request.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/fld/fld_handler.c
5  *  FLD (Fids Location Database)
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Yury Umanets <umka@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_FLD
32
33 #ifdef __KERNEL__
34 # include <libcfs/libcfs.h>
35 # include <linux/module.h>
36 # include <linux/jbd.h>
37 # include <asm/div64.h>
38 #else /* __KERNEL__ */
39 # include <liblustre.h>
40 # include <libcfs/list.h>
41 #endif
42
43 #include <obd.h>
44 #include <obd_class.h>
45 #include <lustre_ver.h>
46 #include <obd_support.h>
47 #include <lprocfs_status.h>
48
49 #include <dt_object.h>
50 #include <md_object.h>
51 #include <lustre_req_layout.h>
52 #include <lustre_fld.h>
53 #include "fld_internal.h"
54
55 #ifdef __KERNEL__
56 extern struct fld_cache_info *fld_cache;
57
58 static __u32 fld_cache_hash(__u64 seq)
59 {
60         return seq;
61 }
62
63 static int
64 fld_cache_insert(struct fld_cache_info *fld_cache,
65                  __u64 seq, __u64 mds)
66 {
67         struct fld_cache *fld;
68         struct hlist_head *bucket;
69         struct hlist_node *scan;
70         int rc = 0;
71         ENTRY;
72
73         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
74                                         fld_cache->fld_hash_mask);
75
76         OBD_ALLOC_PTR(fld);
77         if (!fld)
78                 RETURN(-ENOMEM);
79
80         INIT_HLIST_NODE(&fld->fld_list);
81         fld->fld_mds = mds;
82         fld->fld_seq = seq;
83
84         spin_lock(&fld_cache->fld_lock);
85         hlist_for_each_entry(fld, scan, bucket, fld_list) {
86                 if (fld->fld_seq == seq)
87                         GOTO(exit_unlock, rc = -EEXIST);
88         }
89         hlist_add_head(&fld->fld_list, bucket);
90         EXIT;
91 exit_unlock:
92         spin_unlock(&fld_cache->fld_lock);
93         if (rc != 0)
94                 OBD_FREE(fld, sizeof(*fld));
95         return rc;
96 }
97
98 static struct fld_cache *
99 fld_cache_lookup(struct fld_cache_info *fld_cache, __u64 seq)
100 {
101         struct hlist_head *bucket;
102         struct hlist_node *scan;
103         struct fld_cache *fld;
104         ENTRY;
105
106         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
107                                         fld_cache->fld_hash_mask);
108
109         spin_lock(&fld_cache->fld_lock);
110         hlist_for_each_entry(fld, scan, bucket, fld_list) {
111                 if (fld->fld_seq == seq) {
112                         spin_unlock(&fld_cache->fld_lock);
113                         RETURN(fld);
114                 }
115         }
116         spin_unlock(&fld_cache->fld_lock);
117
118         RETURN(NULL);
119 }
120
121 static void
122 fld_cache_delete(struct fld_cache_info *fld_cache, __u64 seq)
123 {
124         struct hlist_head *bucket;
125         struct hlist_node *scan;
126         struct fld_cache *fld;
127         ENTRY;
128
129         bucket = fld_cache->fld_hash + (fld_cache_hash(seq) &
130                                         fld_cache->fld_hash_mask);
131
132         spin_lock(&fld_cache->fld_lock);
133         hlist_for_each_entry(fld, scan, bucket, fld_list) {
134                 if (fld->fld_seq == seq) {
135                         hlist_del_init(&fld->fld_list);
136                         GOTO(out_unlock, 0);
137                 }
138         }
139
140         EXIT;
141 out_unlock:
142         spin_unlock(&fld_cache->fld_lock);
143         return;
144 }
145 #endif
146
147 static int fld_rrb_hash(struct lu_client_fld *fld, __u64 seq)
148 {
149         if (fld->fld_count == 0)
150                 return 0;
151         
152         return do_div(seq, fld->fld_count);
153 }
154
155 static int fld_dht_hash(struct lu_client_fld *fld, __u64 seq)
156 {
157         /* XXX: here should DHT hash */
158         return fld_rrb_hash(fld, seq);
159 }
160
161 static struct lu_fld_hash fld_hash[3] = {
162         {
163                 .fh_name = "DHT",
164                 .fh_func = fld_dht_hash
165         },
166         {
167                 .fh_name = "Round Robin",
168                 .fh_func = fld_rrb_hash
169         },
170         {
171                 0,
172         }
173 };
174
175 static struct obd_export *
176 fld_client_get_export(struct lu_client_fld *fld, __u64 seq)
177 {
178         struct obd_export *fld_exp;
179         int count = 0, hash;
180         ENTRY;
181
182         LASSERT(fld->fld_hash != NULL);
183         hash = fld->fld_hash->fh_func(fld, seq);
184
185         spin_lock(&fld->fld_lock);
186         list_for_each_entry(fld_exp,
187                             &fld->fld_exports, exp_fld_chain) {
188                 if (count == hash) {
189                         spin_unlock(&fld->fld_lock);
190                         RETURN(fld_exp);
191                 }
192                 count++;
193         }
194         spin_unlock(&fld->fld_lock);
195         RETURN(NULL);
196 }
197
198 /* add export to FLD. This is usually done by CMM and LMV as they are main users
199  * of FLD module. */
200 int fld_client_add_export(struct lu_client_fld *fld,
201                           struct obd_export *exp)
202 {
203         struct obd_export *fld_exp;
204         ENTRY;
205
206         LASSERT(exp != NULL);
207
208         CDEBUG(D_INFO|D_WARNING, "adding export %s\n",
209                exp->exp_client_uuid.uuid);
210         
211         spin_lock(&fld->fld_lock);
212         list_for_each_entry(fld_exp, &fld->fld_exports, exp_fld_chain) {
213                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
214                                     &exp->exp_client_uuid))
215                 {
216                         spin_unlock(&fld->fld_lock);
217                         RETURN(-EEXIST);
218                 }
219         }
220         
221         fld_exp = class_export_get(exp);
222         list_add_tail(&fld_exp->exp_fld_chain,
223                       &fld->fld_exports);
224         fld->fld_count++;
225         
226         spin_unlock(&fld->fld_lock);
227         
228         RETURN(0);
229 }
230 EXPORT_SYMBOL(fld_client_add_export);
231
232 /* remove export from FLD */
233 int fld_client_del_export(struct lu_client_fld *fld,
234                           struct obd_export *exp)
235 {
236         struct obd_export *fld_exp;
237         struct obd_export *tmp;
238         ENTRY;
239
240         spin_lock(&fld->fld_lock);
241         list_for_each_entry_safe(fld_exp, tmp, &fld->fld_exports, exp_fld_chain) {
242                 if (obd_uuid_equals(&fld_exp->exp_client_uuid,
243                                     &exp->exp_client_uuid))
244                 {
245                         fld->fld_count--;
246                         list_del(&fld_exp->exp_fld_chain);
247                         class_export_get(fld_exp);
248
249                         spin_unlock(&fld->fld_lock);
250                         RETURN(0);
251                 }
252         }
253         spin_unlock(&fld->fld_lock);
254         RETURN(-ENOENT);
255 }
256 EXPORT_SYMBOL(fld_client_del_export);
257
258 int fld_client_init(struct lu_client_fld *fld, int hash)
259 {
260         int rc = 0;
261         ENTRY;
262
263         LASSERT(fld != NULL);
264
265         if (hash < 0 || hash >= LUSTRE_CLI_FLD_HASH_LAST) {
266                 CERROR("wrong hash function 0x%x\n", hash);
267                 RETURN(-EINVAL);
268         }
269         
270         INIT_LIST_HEAD(&fld->fld_exports);
271         spin_lock_init(&fld->fld_lock);
272         fld->fld_hash = &fld_hash[hash];
273         fld->fld_count = 0;
274         
275         CDEBUG(D_INFO|D_WARNING, "Client FLD initialized, using \"%s\" hash\n",
276                fld->fld_hash->fh_name);
277         RETURN(rc);
278 }
279 EXPORT_SYMBOL(fld_client_init);
280
281 void fld_client_fini(struct lu_client_fld *fld)
282 {
283         struct obd_export *fld_exp;
284         struct obd_export *tmp;
285         ENTRY;
286
287         spin_lock(&fld->fld_lock);
288         list_for_each_entry_safe(fld_exp, tmp,
289                                  &fld->fld_exports, exp_fld_chain) {
290                 fld->fld_count--;
291                 list_del(&fld_exp->exp_fld_chain);
292                 class_export_get(fld_exp);
293         }
294         spin_unlock(&fld->fld_lock);
295         CDEBUG(D_INFO|D_WARNING, "Client FLD finalized\n");
296         EXIT;
297 }
298 EXPORT_SYMBOL(fld_client_fini);
299
300 static int
301 fld_client_rpc(struct obd_export *exp,
302                struct md_fld *mf, __u32 fld_op)
303 {
304         int size[2] = {sizeof(__u32), sizeof(struct md_fld)}, rc;
305         int mf_size = sizeof(struct md_fld);
306         struct ptlrpc_request *req;
307         struct md_fld *pmf;
308         __u32 *op;
309         ENTRY;
310
311         LASSERT(exp != NULL);
312
313         req = ptlrpc_prep_req(class_exp2cliimp(exp),
314                               LUSTRE_MDS_VERSION, FLD_QUERY,
315                               2, size, NULL);
316         if (req == NULL)
317                 RETURN(-ENOMEM);
318
319         op = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*op));
320         *op = fld_op;
321
322         pmf = lustre_msg_buf(req->rq_reqmsg, 1, sizeof (*pmf));
323         memcpy(pmf, mf, sizeof(*mf));
324
325         req->rq_replen = lustre_msg_size(1, &mf_size);
326         req->rq_request_portal = MDS_FLD_PORTAL;
327
328         rc = ptlrpc_queue_wait(req);
329         if (rc)
330                 GOTO(out_req, rc);
331
332         pmf = lustre_swab_repbuf(req, 0, sizeof(*pmf),
333                                  lustre_swab_md_fld);
334         *mf = *pmf; 
335 out_req:
336         ptlrpc_req_finished(req);
337         RETURN(rc);
338 }
339
340 int
341 fld_client_create(struct lu_client_fld *fld,
342                   __u64 seq, mdsno_t mds)
343 {
344         struct obd_export *fld_exp;
345         struct md_fld      md_fld;
346         __u32 rc;
347         ENTRY;
348
349         fld_exp = fld_client_get_export(fld, seq);
350         if (!fld_exp)
351                 RETURN(-EINVAL);
352         md_fld.mf_seq = seq;
353         md_fld.mf_mds = mds;
354         
355         rc = fld_client_rpc(fld_exp, &md_fld, FLD_CREATE);
356 #ifdef __KERNEL__
357         fld_cache_insert(fld_cache, seq, mds);
358 #endif
359         
360         RETURN(rc);
361 }
362 EXPORT_SYMBOL(fld_client_create);
363
364 int
365 fld_client_delete(struct lu_client_fld *fld,
366                   __u64 seq, mdsno_t mds)
367 {
368         struct obd_export *fld_exp;
369         struct md_fld      md_fld;
370         __u32 rc;
371
372 #ifdef __KERNEL__
373         fld_cache_delete(fld_cache, seq);
374 #endif
375         
376         fld_exp = fld_client_get_export(fld, seq);
377         if (!fld_exp)
378                 RETURN(-EINVAL);
379
380         md_fld.mf_seq = seq;
381         md_fld.mf_mds = mds;
382
383         rc = fld_client_rpc(fld_exp, &md_fld, FLD_DELETE);
384         RETURN(rc);
385 }
386 EXPORT_SYMBOL(fld_client_delete);
387
388 static int
389 fld_client_get(struct lu_client_fld *fld,
390                __u64 seq, mdsno_t *mds)
391 {
392         struct obd_export *fld_exp;
393         struct md_fld md_fld;
394         int rc;
395         ENTRY;
396
397         fld_exp = fld_client_get_export(fld, seq);
398         if (!fld_exp)
399                 RETURN(-EINVAL);
400                 
401         md_fld.mf_seq = seq;
402         rc = fld_client_rpc(fld_exp,
403                             &md_fld, FLD_LOOKUP);
404         if (rc == 0)
405                 *mds = md_fld.mf_mds;
406
407         RETURN(rc);
408 }
409
410 /* lookup fid in the namespace of pfid according to the name */
411 int
412 fld_client_lookup(struct lu_client_fld *fld,
413                   __u64 seq, mdsno_t *mds)
414 {
415 #ifdef __KERNEL__
416         struct fld_cache *fld_entry;
417 #endif
418         int rc;
419         ENTRY;
420
421 #ifdef __KERNEL__
422         /* lookup it in the cache */
423         fld_entry = fld_cache_lookup(fld_cache, seq);
424         if (fld_entry != NULL) {
425                 *mds = fld_entry->fld_mds;
426                 RETURN(0);
427         }
428 #endif
429         
430         /* can not find it in the cache */
431         rc = fld_client_get(fld, seq, mds);
432         if (rc)
433                 RETURN(rc);
434
435 #ifdef __KERNEL__
436         rc = fld_cache_insert(fld_cache, seq, *mds);
437 #endif
438         
439         RETURN(rc);
440 }
441 EXPORT_SYMBOL(fld_client_lookup);