Whamcloud - gitweb
LU-12678 ptlrpc: remove bogus LASSERT
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files, except MGS */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* MGS index is different than others, others are listeners to MGS idx */
68 static struct nm_config_file *nodemap_mgs_ncf;
69
70 /* lu_nodemap flags */
71 enum nm_flag_shifts {
72         NM_FL_ALLOW_ROOT_ACCESS = 0x1,
73         NM_FL_TRUST_CLIENT_IDS = 0x2,
74         NM_FL_DENY_UNKNOWN = 0x4,
75         NM_FL_MAP_UID = 0x8,
76         NM_FL_MAP_GID = 0x10,
77         NM_FL_ENABLE_AUDIT = 0x20,
78         NM_FL_FORBID_ENCRYPT = 0x40,
79         NM_FL_MAP_PROJID = 0x80,
80 };
81
82 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id)
83 {
84         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
85                                                         NODEMAP_CLUSTER_IDX));
86         nk->nk_unused = 0;
87 }
88
89 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
90                                      const struct lu_nodemap *nodemap)
91 {
92         BUILD_BUG_ON(sizeof(nr->ncr.ncr_name) != sizeof(nodemap->nm_name));
93
94         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nr->ncr.ncr_name));
95         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
96         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
97         nr->ncr.ncr_squash_projid = cpu_to_le32(nodemap->nm_squash_projid);
98         nr->ncr.ncr_flags = cpu_to_le32(
99                 (nodemap->nmf_trust_client_ids ?
100                         NM_FL_TRUST_CLIENT_IDS : 0) |
101                 (nodemap->nmf_allow_root_access ?
102                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
103                 (nodemap->nmf_deny_unknown ?
104                         NM_FL_DENY_UNKNOWN : 0) |
105                 (nodemap->nmf_map_mode & NODEMAP_MAP_UID ?
106                         NM_FL_MAP_UID : 0) |
107                 (nodemap->nmf_map_mode & NODEMAP_MAP_GID ?
108                         NM_FL_MAP_GID : 0) |
109                 (nodemap->nmf_map_mode & NODEMAP_MAP_PROJID ?
110                         NM_FL_MAP_PROJID : 0) |
111                 (nodemap->nmf_enable_audit ?
112                         NM_FL_ENABLE_AUDIT : 0) |
113                 (nodemap->nmf_forbid_encryption ?
114                         NM_FL_FORBID_ENCRYPT : 0));
115 }
116
117 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
118                                    enum nodemap_id_type id_type,
119                                    u32 id_client)
120 {
121         enum nodemap_idx_type idx_type;
122
123         if (id_type == NODEMAP_UID)
124                 idx_type = NODEMAP_UIDMAP_IDX;
125         else if (id_type == NODEMAP_GID)
126                 idx_type = NODEMAP_GIDMAP_IDX;
127         else if (id_type == NODEMAP_PROJID)
128                 idx_type = NODEMAP_PROJIDMAP_IDX;
129         else
130                 idx_type = NODEMAP_EMPTY_IDX;
131
132         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
133         nk->nk_id_client = cpu_to_le32(id_client);
134 }
135
136 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
137 {
138         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
139 }
140
141 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
142                                    unsigned int rn_id)
143 {
144         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
145                                                         NODEMAP_RANGE_IDX));
146         nk->nk_range_id = cpu_to_le32(rn_id);
147 }
148
149 static void nodemap_range_rec_init(union nodemap_rec *nr,
150                                    const lnet_nid_t nid[2])
151 {
152         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
153         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
154 }
155
156 static void nodemap_global_key_init(struct nodemap_key *nk)
157 {
158         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
159         nk->nk_unused = 0;
160 }
161
162 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
163 {
164         nr->ngr.ngr_is_active = active;
165 }
166
167 /* should be called with dt_write lock */
168 static void nodemap_inc_version(const struct lu_env *env,
169                                 struct dt_object *nodemap_idx,
170                                 struct thandle *th)
171 {
172         u64 ver = dt_version_get(env, nodemap_idx);
173         dt_version_set(env, nodemap_idx, ver + 1, th);
174 }
175
176 enum ncfc_find_create {
177         NCFC_CREATE_NEW = 1,
178 };
179
180 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
181                                                    struct dt_device *dev,
182                                                    struct local_oid_storage *los,
183                                                    enum ncfc_find_create create_new)
184 {
185         struct lu_fid tfid;
186         struct dt_object *root_obj;
187         struct dt_object *nm_obj;
188         int rc = 0;
189
190         rc = dt_root_get(env, dev, &tfid);
191         if (rc < 0)
192                 GOTO(out, nm_obj = ERR_PTR(rc));
193
194         root_obj = dt_locate(env, dev, &tfid);
195         if (unlikely(IS_ERR(root_obj)))
196                 GOTO(out, nm_obj = root_obj);
197
198         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
199         if (rc == -ENOENT) {
200                 if (dev->dd_rdonly)
201                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
202         } else if (rc) {
203                 GOTO(out_root, nm_obj = ERR_PTR(rc));
204         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
205                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
206         }
207
208 again:
209         /* if loading index fails the first time, create new index */
210         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
211                 CDEBUG(D_INFO, "removing old index, creating new one\n");
212                 rc = local_object_unlink(env, dev, root_obj,
213                                          LUSTRE_NODEMAP_NAME);
214                 if (rc < 0) {
215                         /* XXX not sure the best way to get obd name. */
216                         CERROR("cannot destroy nodemap index: rc = %d\n",
217                                rc);
218                         GOTO(out_root, nm_obj = ERR_PTR(rc));
219                 }
220         }
221
222         nm_obj = local_index_find_or_create(env, los, root_obj,
223                                                 LUSTRE_NODEMAP_NAME,
224                                                 S_IFREG | S_IRUGO | S_IWUSR,
225                                                 &dt_nodemap_features);
226         if (IS_ERR(nm_obj))
227                 GOTO(out_root, nm_obj);
228
229         if (nm_obj->do_index_ops == NULL) {
230                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
231                                                       &dt_nodemap_features);
232                 /* even if loading from tgt fails, connecting to MGS will
233                  * rewrite the config
234                  */
235                 if (rc < 0) {
236                         dt_object_put(env, nm_obj);
237
238                         if (create_new == NCFC_CREATE_NEW)
239                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
240
241                         CERROR("cannot load nodemap index from disk, creating "
242                                "new index: rc = %d\n", rc);
243                         create_new = NCFC_CREATE_NEW;
244                         goto again;
245                 }
246         }
247
248 out_root:
249         dt_object_put(env, root_obj);
250 out:
251         return nm_obj;
252 }
253
254 static int nodemap_idx_insert(const struct lu_env *env,
255                               struct dt_object *idx,
256                               const struct nodemap_key *nk,
257                               const union nodemap_rec *nr)
258 {
259         struct thandle *th;
260         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
261         int rc;
262
263         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
264
265         th = dt_trans_create(env, dev);
266
267         if (IS_ERR(th))
268                 GOTO(out, rc = PTR_ERR(th));
269
270         rc = dt_declare_insert(env, idx,
271                                (const struct dt_rec *)nr,
272                                (const struct dt_key *)nk, th);
273         if (rc != 0)
274                 GOTO(out, rc);
275
276         rc = dt_declare_version_set(env, idx, th);
277         if (rc != 0)
278                 GOTO(out, rc);
279
280         rc = dt_trans_start_local(env, dev, th);
281         if (rc != 0)
282                 GOTO(out, rc);
283
284         dt_write_lock(env, idx, 0);
285
286         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
287                        (const struct dt_key *)nk, th);
288
289         nodemap_inc_version(env, idx, th);
290         dt_write_unlock(env, idx);
291 out:
292         dt_trans_stop(env, dev, th);
293
294         return rc;
295 }
296
297 static int nodemap_idx_update(const struct lu_env *env,
298                               struct dt_object *idx,
299                               const struct nodemap_key *nk,
300                               const union nodemap_rec *nr)
301 {
302         struct thandle          *th;
303         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
304         int                      rc = 0;
305
306         th = dt_trans_create(env, dev);
307
308         if (IS_ERR(th))
309                 GOTO(out, rc = PTR_ERR(th));
310
311         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
312         if (rc != 0)
313                 GOTO(out, rc);
314
315         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
316                                (const struct dt_key *)nk, th);
317         if (rc != 0)
318                 GOTO(out, rc);
319
320         rc = dt_declare_version_set(env, idx, th);
321         if (rc != 0)
322                 GOTO(out, rc);
323
324         rc = dt_trans_start_local(env, dev, th);
325         if (rc != 0)
326                 GOTO(out, rc);
327
328         dt_write_lock(env, idx, 0);
329
330         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
331         if (rc != 0)
332                 GOTO(out_lock, rc);
333
334         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
335                        (const struct dt_key *)nk, th);
336         if (rc != 0)
337                 GOTO(out_lock, rc);
338
339         nodemap_inc_version(env, idx, th);
340 out_lock:
341         dt_write_unlock(env, idx);
342 out:
343         dt_trans_stop(env, dev, th);
344
345         return rc;
346 }
347
348 static int nodemap_idx_delete(const struct lu_env *env,
349                               struct dt_object *idx,
350                               const struct nodemap_key *nk,
351                               const union nodemap_rec *unused)
352 {
353         struct thandle          *th;
354         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
355         int                      rc = 0;
356
357         th = dt_trans_create(env, dev);
358
359         if (IS_ERR(th))
360                 GOTO(out, rc = PTR_ERR(th));
361
362         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
363         if (rc != 0)
364                 GOTO(out, rc);
365
366         rc = dt_declare_version_set(env, idx, th);
367         if (rc != 0)
368                 GOTO(out, rc);
369
370         rc = dt_trans_start_local(env, dev, th);
371         if (rc != 0)
372                 GOTO(out, rc);
373
374         dt_write_lock(env, idx, 0);
375
376         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
377
378         nodemap_inc_version(env, idx, th);
379
380         dt_write_unlock(env, idx);
381 out:
382         dt_trans_stop(env, dev, th);
383
384         return rc;
385 }
386
387 enum nm_add_update {
388         NM_ADD = 0,
389         NM_UPDATE = 1,
390 };
391
392 static int nodemap_idx_nodemap_add_update(const struct lu_nodemap *nodemap,
393                                           struct dt_object *idx,
394                                           enum nm_add_update update)
395 {
396         struct nodemap_key nk;
397         union nodemap_rec nr;
398         struct lu_env env;
399         int rc = 0;
400
401         ENTRY;
402
403         rc = lu_env_init(&env, LCT_LOCAL);
404         if (rc)
405                 RETURN(rc);
406
407         nodemap_cluster_key_init(&nk, nodemap->nm_id);
408         nodemap_cluster_rec_init(&nr, nodemap);
409
410         if (update == NM_UPDATE)
411                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
412         else
413                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
414
415         lu_env_fini(&env);
416
417         RETURN(rc);
418 }
419
420 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
421 {
422         if (nodemap_mgs_ncf == NULL) {
423                 CERROR("cannot add nodemap config to non-existing MGS.\n");
424                 return -EINVAL;
425         }
426
427         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
428                                               NM_ADD);
429 }
430
431 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
432 {
433         if (nodemap_mgs_ncf == NULL) {
434                 CERROR("cannot add nodemap config to non-existing MGS.\n");
435                 return -EINVAL;
436         }
437
438         return nodemap_idx_nodemap_add_update(nodemap, nodemap_mgs_ncf->ncf_obj,
439                                               NM_UPDATE);
440 }
441
442 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
443 {
444         struct rb_root           root;
445         struct lu_idmap         *idmap;
446         struct lu_idmap         *temp;
447         struct lu_nid_range     *range;
448         struct lu_nid_range     *range_temp;
449         struct nodemap_key       nk;
450         struct lu_env            env;
451         int                      rc = 0;
452         int                      rc2 = 0;
453
454         ENTRY;
455
456         if (nodemap_mgs_ncf == NULL) {
457                 CERROR("cannot add nodemap config to non-existing MGS.\n");
458                 return -EINVAL;
459         }
460
461         rc = lu_env_init(&env, LCT_LOCAL);
462         if (rc != 0)
463                 RETURN(rc);
464
465         root = nodemap->nm_fs_to_client_uidmap;
466         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
467                                                 id_fs_to_client) {
468                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
469                                        idmap->id_client);
470                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
471                                          &nk, NULL);
472                 if (rc2 < 0)
473                         rc = rc2;
474         }
475
476         root = nodemap->nm_client_to_fs_gidmap;
477         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
478                                                 id_client_to_fs) {
479                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
480                                        idmap->id_client);
481                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
482                                          &nk, NULL);
483                 if (rc2 < 0)
484                         rc = rc2;
485         }
486
487         root = nodemap->nm_client_to_fs_projidmap;
488         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
489                                                 id_client_to_fs) {
490                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_PROJID,
491                                        idmap->id_client);
492                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
493                                          &nk, NULL);
494                 if (rc2 < 0)
495                         rc = rc2;
496         }
497
498         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
499                                  rn_list) {
500                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
501                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
502                                          &nk, NULL);
503                 if (rc2 < 0)
504                         rc = rc2;
505         }
506
507         nodemap_cluster_key_init(&nk, nodemap->nm_id);
508         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
509         if (rc2 < 0)
510                 rc = rc2;
511
512         lu_env_fini(&env);
513
514         RETURN(rc);
515 }
516
517 int nodemap_idx_range_add(const struct lu_nid_range *range,
518                           const lnet_nid_t nid[2])
519 {
520         struct nodemap_key       nk;
521         union nodemap_rec        nr;
522         struct lu_env            env;
523         int                      rc = 0;
524         ENTRY;
525
526         if (nodemap_mgs_ncf == NULL) {
527                 CERROR("cannot add nodemap config to non-existing MGS.\n");
528                 return -EINVAL;
529         }
530
531         rc = lu_env_init(&env, LCT_LOCAL);
532         if (rc != 0)
533                 RETURN(rc);
534
535         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
536         nodemap_range_rec_init(&nr, nid);
537
538         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
539         lu_env_fini(&env);
540
541         RETURN(rc);
542 }
543
544 int nodemap_idx_range_del(const struct lu_nid_range *range)
545 {
546         struct nodemap_key       nk;
547         struct lu_env            env;
548         int                      rc = 0;
549         ENTRY;
550
551         if (nodemap_mgs_ncf == NULL) {
552                 CERROR("cannot add nodemap config to non-existing MGS.\n");
553                 return -EINVAL;
554         }
555
556         rc = lu_env_init(&env, LCT_LOCAL);
557         if (rc != 0)
558                 RETURN(rc);
559
560         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
561
562         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
563         lu_env_fini(&env);
564
565         RETURN(rc);
566 }
567
568 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
569                           enum nodemap_id_type id_type,
570                           const u32 map[2])
571 {
572         struct nodemap_key       nk;
573         union nodemap_rec        nr;
574         struct lu_env            env;
575         int                      rc = 0;
576         ENTRY;
577
578         if (nodemap_mgs_ncf == NULL) {
579                 CERROR("cannot add nodemap config to non-existing MGS.\n");
580                 return -EINVAL;
581         }
582
583         rc = lu_env_init(&env, LCT_LOCAL);
584         if (rc != 0)
585                 RETURN(rc);
586
587         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
588         nodemap_idmap_rec_init(&nr, map[1]);
589
590         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
591         lu_env_fini(&env);
592
593         RETURN(rc);
594 }
595
596 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
597                           enum nodemap_id_type id_type,
598                           const u32 map[2])
599 {
600         struct nodemap_key       nk;
601         struct lu_env            env;
602         int                      rc = 0;
603         ENTRY;
604
605         if (nodemap_mgs_ncf == NULL) {
606                 CERROR("cannot add nodemap config to non-existing MGS.\n");
607                 return -EINVAL;
608         }
609
610         rc = lu_env_init(&env, LCT_LOCAL);
611         if (rc != 0)
612                 RETURN(rc);
613
614         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
615
616         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
617         lu_env_fini(&env);
618
619         RETURN(rc);
620 }
621
622 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
623 {
624         struct nodemap_key       nk;
625         union nodemap_rec        nr;
626         struct lu_env            env;
627         int                      rc = 0;
628         ENTRY;
629
630         if (nodemap_mgs_ncf == NULL) {
631                 CERROR("cannot add nodemap config to non-existing MGS.\n");
632                 return -EINVAL;
633         }
634
635         rc = lu_env_init(&env, LCT_LOCAL);
636         if (rc != 0)
637                 RETURN(rc);
638
639         nodemap_global_key_init(&nk);
640         nodemap_global_rec_init(&nr, value);
641
642         if (update == NM_UPDATE)
643                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
644                                         &nk, &nr);
645         else
646                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
647                                         &nk, &nr);
648
649         lu_env_fini(&env);
650
651         RETURN(rc);
652 }
653
654 int nodemap_idx_nodemap_activate(bool value)
655 {
656         return nodemap_idx_global_add_update(value, NM_UPDATE);
657 }
658
659 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
660 {
661         u32                      nodemap_id;
662
663         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
664         return nm_idx_get_type(nodemap_id);
665 }
666
667 /**
668  * Process a key/rec pair and modify the new configuration.
669  *
670  * \param       config          configuration to update with this key/rec data
671  * \param       key             key of the record that was loaded
672  * \param       rec             record that was loaded
673  * \param       recent_nodemap  last referenced nodemap
674  * \retval      type of record processed, see enum #nodemap_idx_type
675  * \retval      -ENOENT         range or map loaded before nodemap record
676  * \retval      -EINVAL         duplicate nodemap cluster records found with
677  *                              different IDs, or nodemap has invalid name
678  * \retval      -ENOMEM
679  */
680 static int nodemap_process_keyrec(struct nodemap_config *config,
681                                   const struct nodemap_key *key,
682                                   const union nodemap_rec *rec,
683                                   struct lu_nodemap **recent_nodemap)
684 {
685         struct lu_nodemap *nodemap = NULL;
686         enum nodemap_idx_type type;
687         enum nodemap_id_type id_type;
688         u8 flags;
689         u32 nodemap_id;
690         lnet_nid_t nid[2];
691         u32 map[2];
692         int rc;
693
694         ENTRY;
695
696         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
697
698         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
699         type = nodemap_get_key_type(key);
700         nodemap_id = nm_idx_set_type(nodemap_id, 0);
701
702         CDEBUG(D_INFO, "found config entry, nm_id %d type %d\n",
703                nodemap_id, type);
704
705         /* find the correct nodemap in the load list */
706         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
707             type == NODEMAP_GIDMAP_IDX || type == NODEMAP_PROJIDMAP_IDX) {
708                 struct lu_nodemap *tmp = NULL;
709
710                 nodemap = *recent_nodemap;
711
712                 if (nodemap == NULL)
713                         GOTO(out, rc = -ENOENT);
714
715                 if (nodemap->nm_id != nodemap_id) {
716                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
717                                 if (tmp->nm_id == nodemap_id) {
718                                         nodemap = tmp;
719                                         break;
720                                 }
721
722                         if (nodemap->nm_id != nodemap_id)
723                                 GOTO(out, rc = -ENOENT);
724                 }
725
726                 /* update most recently used nodemap if necessay */
727                 if (nodemap != *recent_nodemap)
728                         *recent_nodemap = nodemap;
729         }
730
731         switch (type) {
732         case NODEMAP_EMPTY_IDX:
733                 if (nodemap_id != 0)
734                         CWARN("Found nodemap config record without type field, "
735                               " nodemap_id=%d. nodemap config file corrupt?\n",
736                               nodemap_id);
737                 break;
738         case NODEMAP_CLUSTER_IDX: {
739                 struct lu_nodemap *old_nm = NULL;
740
741                 nodemap = cfs_hash_lookup(config->nmc_nodemap_hash,
742                                           rec->ncr.ncr_name);
743                 if (nodemap == NULL) {
744                         if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID) {
745                                 nodemap = nodemap_create(rec->ncr.ncr_name,
746                                                          config, 1);
747                         } else {
748                                 nodemap = nodemap_create(rec->ncr.ncr_name,
749                                                          config, 0);
750                         }
751                         if (IS_ERR(nodemap))
752                                 GOTO(out, rc = PTR_ERR(nodemap));
753
754                         /* we need to override the local ID with the saved ID */
755                         nodemap->nm_id = nodemap_id;
756                         if (nodemap_id > config->nmc_nodemap_highest_id)
757                                 config->nmc_nodemap_highest_id = nodemap_id;
758
759                 } else if (nodemap->nm_id != nodemap_id) {
760                         nodemap_putref(nodemap);
761                         GOTO(out, rc = -EINVAL);
762                 }
763
764                 nodemap->nm_squash_uid =
765                                 le32_to_cpu(rec->ncr.ncr_squash_uid);
766                 nodemap->nm_squash_gid =
767                                 le32_to_cpu(rec->ncr.ncr_squash_gid);
768                 nodemap->nm_squash_projid =
769                         le32_to_cpu(rec->ncr.ncr_squash_projid);
770
771                 flags = le32_to_cpu(rec->ncr.ncr_flags);
772                 nodemap->nmf_allow_root_access =
773                                         flags & NM_FL_ALLOW_ROOT_ACCESS;
774                 nodemap->nmf_trust_client_ids =
775                                         flags & NM_FL_TRUST_CLIENT_IDS;
776                 nodemap->nmf_deny_unknown =
777                                         flags & NM_FL_DENY_UNKNOWN;
778                 nodemap->nmf_map_mode = (flags & NM_FL_MAP_UID ?
779                                          NODEMAP_MAP_UID : 0) |
780                                         (flags & NM_FL_MAP_GID ?
781                                          NODEMAP_MAP_GID : 0) |
782                                         (flags & NM_FL_MAP_PROJID ?
783                                          NODEMAP_MAP_PROJID : 0);
784                 nodemap->nmf_enable_audit =
785                                         flags & NM_FL_ENABLE_AUDIT;
786                 nodemap->nmf_forbid_encryption =
787                                         flags & NM_FL_FORBID_ENCRYPT;
788
789                 /* The fileset should be saved otherwise it will be empty
790                  * every time in case of "NODEMAP_CLUSTER_IDX". */
791                 mutex_lock(&active_config_lock);
792                 old_nm = nodemap_lookup(rec->ncr.ncr_name);
793                 if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
794                         strlcpy(nodemap->nm_fileset, old_nm->nm_fileset,
795                                 sizeof(nodemap->nm_fileset));
796                 mutex_unlock(&active_config_lock);
797                 if (!IS_ERR(old_nm))
798                         nodemap_putref(old_nm);
799
800                 if (*recent_nodemap == NULL) {
801                         *recent_nodemap = nodemap;
802                         INIT_LIST_HEAD(&nodemap->nm_list);
803                 } else {
804                         list_add(&nodemap->nm_list,
805                                  &(*recent_nodemap)->nm_list);
806                 }
807                 nodemap_putref(nodemap);
808                 break;
809         }
810         case NODEMAP_RANGE_IDX:
811                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
812                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
813
814                 rc = nodemap_add_range_helper(config, nodemap, nid,
815                                         le32_to_cpu(key->nk_range_id));
816                 if (rc != 0)
817                         GOTO(out, rc);
818                 break;
819         case NODEMAP_UIDMAP_IDX:
820         case NODEMAP_GIDMAP_IDX:
821         case NODEMAP_PROJIDMAP_IDX:
822                 map[0] = le32_to_cpu(key->nk_id_client);
823                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
824
825                 if (type == NODEMAP_UIDMAP_IDX)
826                         id_type = NODEMAP_UID;
827                 else if (type == NODEMAP_GIDMAP_IDX)
828                         id_type = NODEMAP_GID;
829                 else if (type == NODEMAP_PROJIDMAP_IDX)
830                         id_type = NODEMAP_PROJID;
831                 else
832                         GOTO(out, rc = -EINVAL);
833
834                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
835                 if (rc != 0)
836                         GOTO(out, rc);
837                 break;
838         case NODEMAP_GLOBAL_IDX:
839                 config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
840                 break;
841         default:
842                 CERROR("got keyrec pair for unknown type %d\n", type);
843                 break;
844         }
845
846         rc = type;
847
848         EXIT;
849
850 out:
851         return rc;
852 }
853
854 enum nm_config_passes {
855         NM_READ_CLUSTERS = 0,
856         NM_READ_ATTRIBUTES = 1,
857 };
858
859 static int nodemap_load_entries(const struct lu_env *env,
860                                 struct dt_object *nodemap_idx)
861 {
862         const struct dt_it_ops *iops;
863         struct dt_it *it;
864         struct lu_nodemap *recent_nodemap = NULL;
865         struct nodemap_config *new_config = NULL;
866         u64 hash = 0;
867         bool activate_nodemap = false;
868         bool loaded_global_idx = false;
869         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
870         int rc = 0;
871
872         ENTRY;
873
874         iops = &nodemap_idx->do_index_ops->dio_it;
875
876         dt_read_lock(env, nodemap_idx, 0);
877         it = iops->init(env, nodemap_idx, 0);
878         if (IS_ERR(it))
879                 GOTO(out, rc = PTR_ERR(it));
880
881         rc = iops->load(env, it, hash);
882         if (rc < 0)
883                 GOTO(out_iops_fini, rc);
884
885         /* rc == 0 means we need to advance to record */
886         if (rc == 0) {
887                 rc = iops->next(env, it);
888
889                 if (rc < 0)
890                         GOTO(out_iops_put, rc);
891                 /* rc > 0 is eof, will be checked in while below */
892         } else {
893                 /* rc == 1, we found initial record and can process below */
894                 rc = 0;
895         }
896
897         new_config = nodemap_config_alloc();
898         if (IS_ERR(new_config)) {
899                 rc = PTR_ERR(new_config);
900                 new_config = NULL;
901                 GOTO(out_iops_put, rc);
902         }
903
904         /* rc > 0 is eof, check initial iops->next here as well */
905         while (rc == 0) {
906                 struct nodemap_key *key;
907                 union nodemap_rec rec;
908                 enum nodemap_idx_type key_type;
909
910                 key = (struct nodemap_key *)iops->key(env, it);
911                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
912                 if ((cur_pass == NM_READ_CLUSTERS &&
913                                 key_type == NODEMAP_CLUSTER_IDX) ||
914                     (cur_pass == NM_READ_ATTRIBUTES &&
915                                 key_type != NODEMAP_CLUSTER_IDX &&
916                                 key_type != NODEMAP_EMPTY_IDX)) {
917                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
918                         if (rc != -ESTALE) {
919                                 if (rc != 0)
920                                         GOTO(out_nodemap_config, rc);
921                                 rc = nodemap_process_keyrec(new_config, key, &rec,
922                                                             &recent_nodemap);
923                                 if (rc < 0)
924                                         GOTO(out_nodemap_config, rc);
925                                 if (rc == NODEMAP_GLOBAL_IDX)
926                                         loaded_global_idx = true;
927                         }
928                 }
929
930                 do
931                         rc = iops->next(env, it);
932                 while (rc == -ESTALE);
933
934                 /* move to second pass */
935                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
936                         cur_pass = NM_READ_ATTRIBUTES;
937                         rc = iops->load(env, it, 0);
938                         if (rc == 0)
939                                 rc = iops->next(env, it);
940                         else if (rc > 0)
941                                 rc = 0;
942                         else
943                                 GOTO(out, rc);
944                 }
945         }
946
947         if (rc > 0)
948                 rc = 0;
949
950 out_nodemap_config:
951         if (rc != 0)
952                 nodemap_config_dealloc(new_config);
953         else
954                 /* creating new default needs to be done outside dt read lock */
955                 activate_nodemap = true;
956 out_iops_put:
957         iops->put(env, it);
958 out_iops_fini:
959         iops->fini(env, it);
960 out:
961         dt_read_unlock(env, nodemap_idx);
962
963         if (rc != 0)
964                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
965                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
966
967         if (!activate_nodemap)
968                 RETURN(rc);
969
970         if (new_config->nmc_default_nodemap == NULL) {
971                 /* new MGS won't have a default nm on disk, so create it here */
972                 struct lu_nodemap *nodemap =
973                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
974                 if (IS_ERR(nodemap)) {
975                         rc = PTR_ERR(nodemap);
976                 } else {
977                         rc = nodemap_idx_nodemap_add_update(
978                                         new_config->nmc_default_nodemap,
979                                         nodemap_idx,
980                                         NM_ADD);
981                         nodemap_putref(new_config->nmc_default_nodemap);
982                 }
983         }
984
985         /* new nodemap config won't have an active/inactive record */
986         if (rc == 0 && loaded_global_idx == false) {
987                 struct nodemap_key       nk;
988                 union nodemap_rec        nr;
989
990                 nodemap_global_key_init(&nk);
991                 nodemap_global_rec_init(&nr, false);
992                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
993         }
994
995         if (rc == 0)
996                 nodemap_config_set_active(new_config);
997         else
998                 nodemap_config_dealloc(new_config);
999
1000         RETURN(rc);
1001 }
1002
1003 /**
1004  * Step through active config and write to disk.
1005  */
1006 struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
1007                                             struct dt_device *dev,
1008                                             struct local_oid_storage *los)
1009 {
1010         struct dt_object *o;
1011         struct lu_nodemap *nodemap;
1012         struct lu_nodemap *nm_tmp;
1013         struct lu_nid_range *range;
1014         struct lu_nid_range *range_temp;
1015         struct lu_idmap *idmap;
1016         struct lu_idmap *id_tmp;
1017         struct rb_root root;
1018         struct nodemap_key nk;
1019         union nodemap_rec nr;
1020         LIST_HEAD(nodemap_list_head);
1021         int rc = 0, rc2;
1022
1023         ENTRY;
1024
1025         /* create a new index file to fill with active config */
1026         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
1027         if (IS_ERR(o))
1028                 RETURN(o);
1029
1030         mutex_lock(&active_config_lock);
1031
1032         /* convert hash to list so we don't spin */
1033         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1034                                nm_hash_list_cb, &nodemap_list_head);
1035
1036         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1037                 nodemap_cluster_key_init(&nk, nodemap->nm_id);
1038                 nodemap_cluster_rec_init(&nr, nodemap);
1039
1040                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1041                 if (rc2 < 0) {
1042                         rc = rc2;
1043                         continue;
1044                 }
1045
1046                 down_read(&active_config->nmc_range_tree_lock);
1047                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1048                                          rn_list) {
1049                         lnet_nid_t nid[2] = {
1050                                 range->rn_start,
1051                                 range->rn_end
1052                         };
1053                         nodemap_range_key_init(&nk, nodemap->nm_id,
1054                                                range->rn_id);
1055                         nodemap_range_rec_init(&nr, nid);
1056                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1057                         if (rc2 < 0)
1058                                 rc = rc2;
1059                 }
1060                 up_read(&active_config->nmc_range_tree_lock);
1061
1062                 /* we don't need to take nm_idmap_lock because active config
1063                  * lock prevents changes from happening to nodemaps
1064                  */
1065                 root = nodemap->nm_client_to_fs_uidmap;
1066                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1067                                                         id_client_to_fs) {
1068                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1069                                                idmap->id_client);
1070                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1071                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1072                         if (rc2 < 0)
1073                                 rc = rc2;
1074                 }
1075
1076                 root = nodemap->nm_client_to_fs_gidmap;
1077                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1078                                                         id_client_to_fs) {
1079                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1080                                                idmap->id_client);
1081                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1082                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1083                         if (rc2 < 0)
1084                                 rc = rc2;
1085                 }
1086
1087                 root = nodemap->nm_client_to_fs_projidmap;
1088                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1089                                                         id_client_to_fs) {
1090                         nodemap_idmap_key_init(&nk, nodemap->nm_id,
1091                                                NODEMAP_PROJID,
1092                                                idmap->id_client);
1093                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1094                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1095                         if (rc2 < 0)
1096                                 rc = rc2;
1097                 }
1098         }
1099         nodemap_global_key_init(&nk);
1100         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1101         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1102         if (rc2 < 0)
1103                 rc = rc2;
1104
1105         mutex_unlock(&active_config_lock);
1106
1107         if (rc < 0) {
1108                 dt_object_put(env, o);
1109                 o = ERR_PTR(rc);
1110         }
1111
1112         RETURN(o);
1113 }
1114
1115 static void nodemap_save_all_caches(void)
1116 {
1117         struct nm_config_file   *ncf;
1118         struct lu_env            env;
1119         int                      rc = 0;
1120
1121         /* recreating nodemap cache requires fld_thread_key be in env */
1122         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1123         if (rc != 0) {
1124                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1125                 return;
1126         }
1127
1128         mutex_lock(&ncf_list_lock);
1129         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1130                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1131                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1132                 struct dt_object *o;
1133
1134                 /* put current config file so save conf can rewrite it */
1135                 dt_object_put_nocache(&env, ncf->ncf_obj);
1136                 ncf->ncf_obj = NULL;
1137
1138                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1139                 if (IS_ERR(o))
1140                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1141                               obd->obd_name, rc);
1142                 else
1143                         ncf->ncf_obj = o;
1144         }
1145         mutex_unlock(&ncf_list_lock);
1146
1147         lu_env_fini(&env);
1148 }
1149
1150 /* tracks if config still needs to be loaded, either from disk or network */
1151 static bool nodemap_config_loaded;
1152 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1153
1154 /**
1155  * Ensures that configs loaded over the wire are prioritized over those loaded
1156  * from disk.
1157  *
1158  * \param config        config to set as the active config
1159  */
1160 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1161 {
1162         mutex_lock(&nodemap_config_loaded_lock);
1163         nodemap_config_set_active(config);
1164         nodemap_config_loaded = true;
1165         nodemap_save_all_caches();
1166         mutex_unlock(&nodemap_config_loaded_lock);
1167 }
1168 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1169
1170 /**
1171  * Register a dt_object representing the config index file. This should be
1172  * called by targets in order to load the nodemap configuration from disk. The
1173  * dt_object should be created with local_index_find_or_create and the index
1174  * features should be enabled with do_index_try.
1175  *
1176  * \param obj   dt_object returned by local_index_find_or_create
1177  *
1178  * \retval      on success: nm_config_file handle for later deregistration
1179  * \retval      -ENOMEM         memory allocation failure
1180  * \retval      -ENOENT         error loading nodemap config
1181  * \retval      -EINVAL         error loading nodemap config
1182  * \retval      -EEXIST         nodemap config already registered for MGS
1183  */
1184 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1185                                                    struct dt_object *obj,
1186                                                    struct local_oid_storage *los)
1187 {
1188         struct nm_config_file *ncf;
1189         int rc = 0;
1190         ENTRY;
1191
1192         if (nodemap_mgs_ncf != NULL)
1193                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1194
1195         OBD_ALLOC_PTR(ncf);
1196         if (ncf == NULL)
1197                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1198
1199         /* if loading from cache, prevent activation of MGS config until cache
1200          * loading is done, so disk config is overwritten by MGS config.
1201          */
1202         mutex_lock(&nodemap_config_loaded_lock);
1203         rc = nodemap_load_entries(env, obj);
1204         if (!rc)
1205                 nodemap_config_loaded = true;
1206         mutex_unlock(&nodemap_config_loaded_lock);
1207
1208         if (rc) {
1209                 OBD_FREE_PTR(ncf);
1210                 GOTO(out, ncf = ERR_PTR(rc));
1211         }
1212
1213         lu_object_get(&obj->do_lu);
1214
1215         ncf->ncf_obj = obj;
1216         ncf->ncf_los = los;
1217
1218         nodemap_mgs_ncf = ncf;
1219
1220 out:
1221         return ncf;
1222 }
1223 EXPORT_SYMBOL(nm_config_file_register_mgs);
1224
1225 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1226                                                    struct dt_device *dev,
1227                                                    struct local_oid_storage *los)
1228 {
1229         struct nm_config_file *ncf;
1230         struct dt_object *config_obj = NULL;
1231         int rc = 0;
1232
1233         OBD_ALLOC_PTR(ncf);
1234         if (ncf == NULL)
1235                 RETURN(ERR_PTR(-ENOMEM));
1236
1237         /* don't load from cache if config already loaded */
1238         mutex_lock(&nodemap_config_loaded_lock);
1239         if (!nodemap_config_loaded) {
1240                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1241                 if (IS_ERR(config_obj))
1242                         rc = PTR_ERR(config_obj);
1243                 else
1244                         rc = nodemap_load_entries(env, config_obj);
1245
1246                 if (!rc)
1247                         nodemap_config_loaded = true;
1248         }
1249         mutex_unlock(&nodemap_config_loaded_lock);
1250         if (rc)
1251                 GOTO(out_ncf, rc);
1252
1253         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1254         if (!config_obj) {
1255                 config_obj = nodemap_save_config_cache(env, dev, los);
1256                 if (IS_ERR(config_obj))
1257                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1258         }
1259
1260         ncf->ncf_obj = config_obj;
1261         ncf->ncf_los = los;
1262
1263         mutex_lock(&ncf_list_lock);
1264         list_add(&ncf->ncf_list, &ncf_list_head);
1265         mutex_unlock(&ncf_list_lock);
1266
1267 out_ncf:
1268         if (rc) {
1269                 OBD_FREE_PTR(ncf);
1270                 RETURN(ERR_PTR(rc));
1271         }
1272
1273         RETURN(ncf);
1274 }
1275 EXPORT_SYMBOL(nm_config_file_register_tgt);
1276
1277 /**
1278  * Deregister a nm_config_file. Should be called by targets during cleanup.
1279  *
1280  * \param ncf   config file to deregister
1281  */
1282 void nm_config_file_deregister_mgs(const struct lu_env *env,
1283                                    struct nm_config_file *ncf)
1284 {
1285         ENTRY;
1286         LASSERT(nodemap_mgs_ncf == ncf);
1287
1288         nodemap_mgs_ncf = NULL;
1289         if (ncf->ncf_obj)
1290                 dt_object_put(env, ncf->ncf_obj);
1291
1292         OBD_FREE_PTR(ncf);
1293
1294         EXIT;
1295 }
1296 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1297
1298 void nm_config_file_deregister_tgt(const struct lu_env *env,
1299                                    struct nm_config_file *ncf)
1300 {
1301         ENTRY;
1302
1303         if (ncf == NULL)
1304                 return;
1305
1306         mutex_lock(&ncf_list_lock);
1307         list_del(&ncf->ncf_list);
1308         mutex_unlock(&ncf_list_lock);
1309
1310         if (ncf->ncf_obj)
1311                 dt_object_put(env, ncf->ncf_obj);
1312
1313         OBD_FREE_PTR(ncf);
1314
1315         EXIT;
1316 }
1317 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1318
1319 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1320                               struct lu_nodemap **recent_nodemap)
1321 {
1322         struct nodemap_key *key;
1323         union nodemap_rec *rec;
1324         char *entry;
1325         int j;
1326         int k;
1327         int rc = 0;
1328         int size = dt_nodemap_features.dif_keysize_max +
1329                    dt_nodemap_features.dif_recsize_max;
1330         ENTRY;
1331
1332         for (j = 0; j < LU_PAGE_COUNT; j++) {
1333                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1334                         return -EINVAL;
1335
1336                 /* get and process keys and records from page */
1337                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1338                         entry = lip->lp_idx.lip_entries + k * size;
1339                         key = (struct nodemap_key *)entry;
1340
1341                         entry += dt_nodemap_features.dif_keysize_max;
1342                         rec = (union nodemap_rec *)entry;
1343
1344                         rc = nodemap_process_keyrec(config, key, rec,
1345                                                     recent_nodemap);
1346                         if (rc < 0)
1347                                 return rc;
1348                 }
1349                 lip++;
1350         }
1351
1352         EXIT;
1353         return 0;
1354 }
1355 EXPORT_SYMBOL(nodemap_process_idx_pages);
1356
1357 static int nodemap_page_build(const struct lu_env *env, union lu_page *lp,
1358                               size_t nob, const struct dt_it_ops *iops,
1359                               struct dt_it *it, __u32 attr, void *arg)
1360 {
1361         struct idx_info *ii = (struct idx_info *)arg;
1362         struct lu_idxpage *lip = &lp->lp_idx;
1363         char *entry;
1364         size_t size = ii->ii_keysize + ii->ii_recsize;
1365         int rc;
1366         ENTRY;
1367
1368         if (nob < LIP_HDR_SIZE)
1369                 return -EINVAL;
1370
1371         /* initialize the header of the new container */
1372         memset(lip, 0, LIP_HDR_SIZE);
1373         lip->lip_magic = LIP_MAGIC;
1374         nob           -= LIP_HDR_SIZE;
1375
1376         entry = lip->lip_entries;
1377         do {
1378                 char            *tmp_entry = entry;
1379                 struct dt_key   *key;
1380                 __u64           hash;
1381                 enum nodemap_idx_type key_type;
1382
1383                 /* fetch 64-bit hash value */
1384                 hash = iops->store(env, it);
1385                 ii->ii_hash_end = hash;
1386
1387                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1388                         if (lip->lip_nr != 0)
1389                                 GOTO(out, rc = 0);
1390                 }
1391
1392                 if (nob < size) {
1393                         if (lip->lip_nr == 0)
1394                                 GOTO(out, rc = -EINVAL);
1395                         GOTO(out, rc = 0);
1396                 }
1397
1398                 key = iops->key(env, it);
1399                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1400
1401                 /* on the first pass, get only the cluster types. On second
1402                  * pass, get all the rest */
1403                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1404                                 key_type == NODEMAP_CLUSTER_IDX) ||
1405                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1406                                 key_type != NODEMAP_CLUSTER_IDX &&
1407                                 key_type != NODEMAP_EMPTY_IDX)) {
1408                         memcpy(tmp_entry, key, ii->ii_keysize);
1409                         tmp_entry += ii->ii_keysize;
1410
1411                         /* and finally the record */
1412                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1413                                        attr);
1414                         if (rc != -ESTALE) {
1415                                 if (rc != 0)
1416                                         GOTO(out, rc);
1417
1418                                 /* hash/key/record successfully copied! */
1419                                 lip->lip_nr++;
1420                                 if (unlikely(lip->lip_nr == 1 &&
1421                                     ii->ii_count == 0))
1422                                         ii->ii_hash_start = hash;
1423
1424                                 entry = tmp_entry + ii->ii_recsize;
1425                                 nob -= size;
1426                         }
1427                 }
1428
1429                 /* move on to the next record */
1430                 do {
1431                         rc = iops->next(env, it);
1432                 } while (rc == -ESTALE);
1433
1434                 /* move to second pass */
1435                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1436                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1437                         rc = iops->load(env, it, 0);
1438                         if (rc == 0)
1439                                 rc = iops->next(env, it);
1440                         else if (rc > 0)
1441                                 rc = 0;
1442                         else
1443                                 GOTO(out, rc);
1444                 }
1445
1446         } while (rc == 0);
1447
1448         GOTO(out, rc);
1449 out:
1450         if (rc >= 0 && lip->lip_nr > 0)
1451                 /* one more container */
1452                 ii->ii_count++;
1453         if (rc > 0)
1454                 /* no more entries */
1455                 ii->ii_hash_end = II_END_OFF;
1456         return rc;
1457 }
1458
1459
1460 int nodemap_index_read(struct lu_env *env,
1461                        struct nm_config_file *ncf,
1462                        struct idx_info *ii,
1463                        const struct lu_rdpg *rdpg)
1464 {
1465         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1466         __u64                    version;
1467         int                      rc = 0;
1468
1469         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1470         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1471
1472         dt_read_lock(env, nodemap_idx, 0);
1473         version = dt_version_get(env, nodemap_idx);
1474         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1475                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1476                        ii->ii_version,
1477                        version);
1478                 ii->ii_hash_end = 0;
1479         } else {
1480                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1481                                    ii);
1482                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1483         }
1484
1485         if (rc >= 0)
1486                 ii->ii_version = version;
1487
1488         dt_read_unlock(env, nodemap_idx);
1489         return rc;
1490 }
1491 EXPORT_SYMBOL(nodemap_index_read);
1492
1493 /**
1494  * Returns the current nodemap configuration to MGC by walking the nodemap
1495  * config index and storing it in the response buffer.
1496  *
1497  * \param       req             incoming MGS_CONFIG_READ request
1498  * \retval      0               success
1499  * \retval      -EINVAL         malformed request
1500  * \retval      -ENOTCONN       client evicted/reconnected already
1501  * \retval      -ETIMEDOUT      client timeout or network error
1502  * \retval      -ENOMEM
1503  */
1504 int nodemap_get_config_req(struct obd_device *mgs_obd,
1505                            struct ptlrpc_request *req)
1506 {
1507         const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
1508         struct mgs_config_body *body;
1509         struct mgs_config_res *res;
1510         struct lu_rdpg rdpg;
1511         struct idx_info nodemap_ii;
1512         struct ptlrpc_bulk_desc *desc;
1513         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1514         int i;
1515         int page_count;
1516         int bytes = 0;
1517         int rc = 0;
1518
1519         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1520         if (!body)
1521                 RETURN(-EINVAL);
1522
1523         if (body->mcb_type != MGS_CFG_T_NODEMAP)
1524                 RETURN(-EINVAL);
1525
1526         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1527         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1528                 PAGE_SHIFT;
1529         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1530                 RETURN(-EINVAL);
1531
1532         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1533                body->mcb_name, rdpg.rp_count);
1534
1535         /* allocate pages to store the containers */
1536         OBD_ALLOC_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1537         if (rdpg.rp_pages == NULL)
1538                 RETURN(-ENOMEM);
1539         for (i = 0; i < rdpg.rp_npages; i++) {
1540                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1541                 if (rdpg.rp_pages[i] == NULL)
1542                         GOTO(out, rc = -ENOMEM);
1543         }
1544
1545         rdpg.rp_hash = body->mcb_offset;
1546         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1547         nodemap_ii.ii_flags = II_FL_NOHASH;
1548         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1549         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1550
1551         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1552                                    mgs_obd->u.obt.obt_nodemap_config_file,
1553                                    &nodemap_ii, &rdpg);
1554         if (bytes < 0)
1555                 GOTO(out, rc = bytes);
1556
1557         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1558
1559         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1560         if (res == NULL)
1561                 GOTO(out, rc = -EINVAL);
1562         res->mcr_offset = nodemap_ii.ii_hash_end;
1563         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1564
1565         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1566         LASSERT(page_count <= rdpg.rp_count);
1567         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1568                                     PTLRPC_BULK_PUT_SOURCE,
1569                                     MGS_BULK_PORTAL, frag_ops);
1570         if (desc == NULL)
1571                 GOTO(out, rc = -ENOMEM);
1572
1573         for (i = 0; i < page_count && bytes > 0; i++) {
1574                 frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
1575                                         min_t(int, bytes, PAGE_SIZE));
1576                 bytes -= PAGE_SIZE;
1577         }
1578
1579         rc = target_bulk_io(req->rq_export, desc);
1580         ptlrpc_free_bulk(desc);
1581
1582 out:
1583         if (rdpg.rp_pages != NULL) {
1584                 for (i = 0; i < rdpg.rp_npages; i++)
1585                         if (rdpg.rp_pages[i] != NULL)
1586                                 __free_page(rdpg.rp_pages[i]);
1587                 OBD_FREE_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1588         }
1589         return rc;
1590 }
1591 EXPORT_SYMBOL(nodemap_get_config_req);