Whamcloud - gitweb
LU-10806 target: skip discard for a missing obt_lut
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57
58 static struct workqueue_struct *zombie_wq;
59
60 static void obd_zombie_export_add(struct obd_export *exp);
61 static void obd_zombie_import_add(struct obd_import *imp);
62 static void print_export_data(struct obd_export *exp,
63                               const char *status, int locks, int debug_level);
64
65 static LIST_HEAD(obd_stale_exports);
66 static DEFINE_SPINLOCK(obd_stale_export_lock);
67 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
68
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         struct list_head *tmp;
104         struct obd_type *type;
105
106         spin_lock(&obd_types_lock);
107         list_for_each(tmp, &obd_types) {
108                 type = list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117 EXPORT_SYMBOL(class_search_type);
118
119 struct obd_type *class_get_type(const char *name)
120 {
121         struct obd_type *type = class_search_type(name);
122
123 #ifdef HAVE_MODULE_LOADING_SUPPORT
124         if (!type) {
125                 const char *modname = name;
126
127                 if (strcmp(modname, "obdfilter") == 0)
128                         modname = "ofd";
129
130                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
131                         modname = LUSTRE_OSP_NAME;
132
133                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
134                         modname = LUSTRE_MDT_NAME;
135
136                 if (!request_module("%s", modname)) {
137                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
138                         type = class_search_type(name);
139                 } else {
140                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
141                                            modname);
142                 }
143         }
144 #endif
145         if (type) {
146                 spin_lock(&type->obd_type_lock);
147                 type->typ_refcnt++;
148                 try_module_get(type->typ_dt_ops->o_owner);
149                 spin_unlock(&type->obd_type_lock);
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         spin_lock(&type->obd_type_lock);
158         type->typ_refcnt--;
159         module_put(type->typ_dt_ops->o_owner);
160         spin_unlock(&type->obd_type_lock);
161 }
162
163 static void class_sysfs_release(struct kobject *kobj)
164 {
165         OBD_FREE(kobj, sizeof(*kobj));
166 }
167
168 static struct kobj_type class_ktype = {
169         .sysfs_ops      = &lustre_sysfs_ops,
170         .release        = class_sysfs_release,
171 };
172
173 struct kobject *class_setup_tunables(const char *name)
174 {
175         struct kobject *kobj;
176         int rc;
177
178 #ifdef HAVE_SERVER_SUPPORT
179         kobj = kset_find_obj(lustre_kset, name);
180         if (kobj)
181                 return kobj;
182 #endif
183         OBD_ALLOC(kobj, sizeof(*kobj));
184         if (!kobj)
185                 return ERR_PTR(-ENOMEM);
186
187         kobj->kset = lustre_kset;
188         kobject_init(kobj, &class_ktype);
189         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
190         if (rc) {
191                 kobject_put(kobj);
192                 return ERR_PTR(rc);
193         }
194         return kobj;
195 }
196 EXPORT_SYMBOL(class_setup_tunables);
197
198 #define CLASS_MAX_NAME 1024
199
200 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
201                         bool enable_proc, struct lprocfs_vars *vars,
202                         const char *name, struct lu_device_type *ldt)
203 {
204         struct obd_type *type;
205 #ifdef HAVE_SERVER_SUPPORT
206         struct qstr dname;
207 #endif /* HAVE_SERVER_SUPPORT */
208         int rc = 0;
209
210         ENTRY;
211         /* sanity check */
212         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
213
214         if (class_search_type(name)) {
215                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
216                 RETURN(-EEXIST);
217         }
218
219         rc = -ENOMEM;
220         OBD_ALLOC(type, sizeof(*type));
221         if (type == NULL)
222                 RETURN(rc);
223
224         OBD_ALLOC_PTR(type->typ_dt_ops);
225         OBD_ALLOC_PTR(type->typ_md_ops);
226         OBD_ALLOC(type->typ_name, strlen(name) + 1);
227
228         if (type->typ_dt_ops == NULL ||
229             type->typ_md_ops == NULL ||
230             type->typ_name == NULL)
231                 GOTO (failed, rc);
232
233         *(type->typ_dt_ops) = *dt_ops;
234         /* md_ops is optional */
235         if (md_ops)
236                 *(type->typ_md_ops) = *md_ops;
237         strcpy(type->typ_name, name);
238         spin_lock_init(&type->obd_type_lock);
239
240 #ifdef CONFIG_PROC_FS
241         if (enable_proc) {
242                 type->typ_procroot = lprocfs_register(type->typ_name,
243                                                       proc_lustre_root,
244                                                       vars, type);
245                 if (IS_ERR(type->typ_procroot)) {
246                         rc = PTR_ERR(type->typ_procroot);
247                         type->typ_procroot = NULL;
248                         GOTO(failed, rc);
249                 }
250         }
251 #endif
252 #ifdef HAVE_SERVER_SUPPORT
253         dname.name = name;
254         dname.len = strlen(dname.name);
255         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
256                                        dname.len);
257         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
258         if (type->typ_debugfs_entry) {
259                 dput(type->typ_debugfs_entry);
260                 type->typ_sym_filter = true;
261                 goto dir_exist;
262         }
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
266                                                     debugfs_lustre_root,
267                                                     NULL, type);
268         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
269                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
270                                              : -ENOMEM;
271                 type->typ_debugfs_entry = NULL;
272                 GOTO(failed, rc);
273         }
274 #ifdef HAVE_SERVER_SUPPORT
275 dir_exist:
276 #endif
277         type->typ_kobj = class_setup_tunables(type->typ_name);
278         if (IS_ERR(type->typ_kobj))
279                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
280
281         if (ldt) {
282                 type->typ_lu = ldt;
283                 rc = lu_device_type_init(ldt);
284                 if (rc) {
285                         kobject_put(type->typ_kobj);
286                         GOTO(failed, rc);
287                 }
288         }
289
290         spin_lock(&obd_types_lock);
291         list_add(&type->typ_chain, &obd_types);
292         spin_unlock(&obd_types_lock);
293
294         RETURN(0);
295
296 failed:
297 #ifdef HAVE_SERVER_SUPPORT
298         if (type->typ_sym_filter)
299                 type->typ_debugfs_entry = NULL;
300 #endif
301         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
302                 ldebugfs_remove(&type->typ_debugfs_entry);
303         if (type->typ_name != NULL) {
304 #ifdef CONFIG_PROC_FS
305                 if (type->typ_procroot != NULL)
306                         remove_proc_subtree(type->typ_name, proc_lustre_root);
307 #endif
308                 OBD_FREE(type->typ_name, strlen(name) + 1);
309         }
310         if (type->typ_md_ops != NULL)
311                 OBD_FREE_PTR(type->typ_md_ops);
312         if (type->typ_dt_ops != NULL)
313                 OBD_FREE_PTR(type->typ_dt_ops);
314         OBD_FREE(type, sizeof(*type));
315         RETURN(rc);
316 }
317 EXPORT_SYMBOL(class_register_type);
318
319 int class_unregister_type(const char *name)
320 {
321         struct obd_type *type = class_search_type(name);
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (type->typ_refcnt) {
330                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
331                 /* This is a bad situation, let's make the best of it */
332                 /* Remove ops, but leave the name for debugging */
333                 OBD_FREE_PTR(type->typ_dt_ops);
334                 OBD_FREE_PTR(type->typ_md_ops);
335                 RETURN(-EBUSY);
336         }
337
338         kobject_put(type->typ_kobj);
339
340         /* we do not use type->typ_procroot as for compatibility purposes
341          * other modules can share names (i.e. lod can use lov entry). so
342          * we can't reference pointer as it can get invalided when another
343          * module removes the entry */
344 #ifdef CONFIG_PROC_FS
345         if (type->typ_procroot != NULL)
346                 remove_proc_subtree(type->typ_name, proc_lustre_root);
347         if (type->typ_procsym != NULL)
348                 lprocfs_remove(&type->typ_procsym);
349 #endif
350 #ifdef HAVE_SERVER_SUPPORT
351         if (type->typ_sym_filter)
352                 type->typ_debugfs_entry = NULL;
353 #endif
354         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
355                 ldebugfs_remove(&type->typ_debugfs_entry);
356
357         if (type->typ_lu)
358                 lu_device_type_fini(type->typ_lu);
359
360         spin_lock(&obd_types_lock);
361         list_del(&type->typ_chain);
362         spin_unlock(&obd_types_lock);
363         OBD_FREE(type->typ_name, strlen(name) + 1);
364         if (type->typ_dt_ops != NULL)
365                 OBD_FREE_PTR(type->typ_dt_ops);
366         if (type->typ_md_ops != NULL)
367                 OBD_FREE_PTR(type->typ_md_ops);
368         OBD_FREE(type, sizeof(*type));
369         RETURN(0);
370 } /* class_unregister_type */
371 EXPORT_SYMBOL(class_unregister_type);
372
373 /**
374  * Create a new obd device.
375  *
376  * Allocate the new obd_device and initialize it.
377  *
378  * \param[in] type_name obd device type string.
379  * \param[in] name      obd device name.
380  * \param[in] uuid      obd device UUID
381  *
382  * \retval newdev         pointer to created obd_device
383  * \retval ERR_PTR(errno) on error
384  */
385 struct obd_device *class_newdev(const char *type_name, const char *name,
386                                 const char *uuid)
387 {
388         struct obd_device *newdev;
389         struct obd_type *type = NULL;
390         ENTRY;
391
392         if (strlen(name) >= MAX_OBD_NAME) {
393                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
394                 RETURN(ERR_PTR(-EINVAL));
395         }
396
397         type = class_get_type(type_name);
398         if (type == NULL){
399                 CERROR("OBD: unknown type: %s\n", type_name);
400                 RETURN(ERR_PTR(-ENODEV));
401         }
402
403         newdev = obd_device_alloc();
404         if (newdev == NULL) {
405                 class_put_type(type);
406                 RETURN(ERR_PTR(-ENOMEM));
407         }
408         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
409         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
410         newdev->obd_type = type;
411         newdev->obd_minor = -1;
412
413         rwlock_init(&newdev->obd_pool_lock);
414         newdev->obd_pool_limit = 0;
415         newdev->obd_pool_slv = 0;
416
417         INIT_LIST_HEAD(&newdev->obd_exports);
418         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
419         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
420         INIT_LIST_HEAD(&newdev->obd_exports_timed);
421         INIT_LIST_HEAD(&newdev->obd_nid_stats);
422         spin_lock_init(&newdev->obd_nid_lock);
423         spin_lock_init(&newdev->obd_dev_lock);
424         mutex_init(&newdev->obd_dev_mutex);
425         spin_lock_init(&newdev->obd_osfs_lock);
426         /* newdev->obd_osfs_age must be set to a value in the distant
427          * past to guarantee a fresh statfs is fetched on mount. */
428         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
429
430         /* XXX belongs in setup not attach  */
431         init_rwsem(&newdev->obd_observer_link_sem);
432         /* recovery data */
433         spin_lock_init(&newdev->obd_recovery_task_lock);
434         init_waitqueue_head(&newdev->obd_next_transno_waitq);
435         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
436         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
437         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
438         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
439         INIT_LIST_HEAD(&newdev->obd_evict_list);
440         INIT_LIST_HEAD(&newdev->obd_lwp_list);
441
442         llog_group_init(&newdev->obd_olg);
443         /* Detach drops this */
444         atomic_set(&newdev->obd_refcount, 1);
445         lu_ref_init(&newdev->obd_reference);
446         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
447
448         newdev->obd_conn_inprogress = 0;
449
450         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
451
452         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
453                newdev->obd_name, newdev);
454
455         return newdev;
456 }
457
458 /**
459  * Free obd device.
460  *
461  * \param[in] obd obd_device to be freed
462  *
463  * \retval none
464  */
465 void class_free_dev(struct obd_device *obd)
466 {
467         struct obd_type *obd_type = obd->obd_type;
468
469         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
470                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
471         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
472                  "obd %p != obd_devs[%d] %p\n",
473                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
474         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
475                  "obd_refcount should be 0, not %d\n",
476                  atomic_read(&obd->obd_refcount));
477         LASSERT(obd_type != NULL);
478
479         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
480                obd->obd_name, obd->obd_type->typ_name);
481
482         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
483                          obd->obd_name, obd->obd_uuid.uuid);
484         if (obd->obd_stopping) {
485                 int err;
486
487                 /* If we're not stopping, we were never set up */
488                 err = obd_cleanup(obd);
489                 if (err)
490                         CERROR("Cleanup %s returned %d\n",
491                                 obd->obd_name, err);
492         }
493
494         obd_device_free(obd);
495
496         class_put_type(obd_type);
497 }
498
499 /**
500  * Unregister obd device.
501  *
502  * Free slot in obd_dev[] used by \a obd.
503  *
504  * \param[in] new_obd obd_device to be unregistered
505  *
506  * \retval none
507  */
508 void class_unregister_device(struct obd_device *obd)
509 {
510         write_lock(&obd_dev_lock);
511         if (obd->obd_minor >= 0) {
512                 LASSERT(obd_devs[obd->obd_minor] == obd);
513                 obd_devs[obd->obd_minor] = NULL;
514                 obd->obd_minor = -1;
515         }
516         write_unlock(&obd_dev_lock);
517 }
518
519 /**
520  * Register obd device.
521  *
522  * Find free slot in obd_devs[], fills it with \a new_obd.
523  *
524  * \param[in] new_obd obd_device to be registered
525  *
526  * \retval 0          success
527  * \retval -EEXIST    device with this name is registered
528  * \retval -EOVERFLOW obd_devs[] is full
529  */
530 int class_register_device(struct obd_device *new_obd)
531 {
532         int ret = 0;
533         int i;
534         int new_obd_minor = 0;
535         bool minor_assign = false;
536         bool retried = false;
537
538 again:
539         write_lock(&obd_dev_lock);
540         for (i = 0; i < class_devno_max(); i++) {
541                 struct obd_device *obd = class_num2obd(i);
542
543                 if (obd != NULL &&
544                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
545
546                         if (!retried) {
547                                 write_unlock(&obd_dev_lock);
548
549                                 /* the obd_device could be waited to be
550                                  * destroyed by the "obd_zombie_impexp_thread".
551                                  */
552                                 obd_zombie_barrier();
553                                 retried = true;
554                                 goto again;
555                         }
556
557                         CERROR("%s: already exists, won't add\n",
558                                obd->obd_name);
559                         /* in case we found a free slot before duplicate */
560                         minor_assign = false;
561                         ret = -EEXIST;
562                         break;
563                 }
564                 if (!minor_assign && obd == NULL) {
565                         new_obd_minor = i;
566                         minor_assign = true;
567                 }
568         }
569
570         if (minor_assign) {
571                 new_obd->obd_minor = new_obd_minor;
572                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
573                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
574                 obd_devs[new_obd_minor] = new_obd;
575         } else {
576                 if (ret == 0) {
577                         ret = -EOVERFLOW;
578                         CERROR("%s: all %u/%u devices used, increase "
579                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
580                                i, class_devno_max(), ret);
581                 }
582         }
583         write_unlock(&obd_dev_lock);
584
585         RETURN(ret);
586 }
587
588 static int class_name2dev_nolock(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         for (i = 0; i < class_devno_max(); i++) {
596                 struct obd_device *obd = class_num2obd(i);
597
598                 if (obd && strcmp(name, obd->obd_name) == 0) {
599                         /* Make sure we finished attaching before we give
600                            out any references */
601                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
602                         if (obd->obd_attached) {
603                                 return i;
604                         }
605                         break;
606                 }
607         }
608
609         return -1;
610 }
611
612 int class_name2dev(const char *name)
613 {
614         int i;
615
616         if (!name)
617                 return -1;
618
619         read_lock(&obd_dev_lock);
620         i = class_name2dev_nolock(name);
621         read_unlock(&obd_dev_lock);
622
623         return i;
624 }
625 EXPORT_SYMBOL(class_name2dev);
626
627 struct obd_device *class_name2obd(const char *name)
628 {
629         int dev = class_name2dev(name);
630
631         if (dev < 0 || dev > class_devno_max())
632                 return NULL;
633         return class_num2obd(dev);
634 }
635 EXPORT_SYMBOL(class_name2obd);
636
637 int class_uuid2dev_nolock(struct obd_uuid *uuid)
638 {
639         int i;
640
641         for (i = 0; i < class_devno_max(); i++) {
642                 struct obd_device *obd = class_num2obd(i);
643
644                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
645                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
646                         return i;
647                 }
648         }
649
650         return -1;
651 }
652
653 int class_uuid2dev(struct obd_uuid *uuid)
654 {
655         int i;
656
657         read_lock(&obd_dev_lock);
658         i = class_uuid2dev_nolock(uuid);
659         read_unlock(&obd_dev_lock);
660
661         return i;
662 }
663 EXPORT_SYMBOL(class_uuid2dev);
664
665 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
666 {
667         int dev = class_uuid2dev(uuid);
668         if (dev < 0)
669                 return NULL;
670         return class_num2obd(dev);
671 }
672 EXPORT_SYMBOL(class_uuid2obd);
673
674 /**
675  * Get obd device from ::obd_devs[]
676  *
677  * \param num [in] array index
678  *
679  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
680  *         otherwise return the obd device there.
681  */
682 struct obd_device *class_num2obd(int num)
683 {
684         struct obd_device *obd = NULL;
685
686         if (num < class_devno_max()) {
687                 obd = obd_devs[num];
688                 if (obd == NULL)
689                         return NULL;
690
691                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
692                          "%p obd_magic %08x != %08x\n",
693                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
694                 LASSERTF(obd->obd_minor == num,
695                          "%p obd_minor %0d != %0d\n",
696                          obd, obd->obd_minor, num);
697         }
698
699         return obd;
700 }
701
702 /**
703  * Find obd in obd_dev[] by name or uuid.
704  *
705  * Increment obd's refcount if found.
706  *
707  * \param[in] str obd name or uuid
708  *
709  * \retval NULL    if not found
710  * \retval target  pointer to found obd_device
711  */
712 struct obd_device *class_dev_by_str(const char *str)
713 {
714         struct obd_device *target = NULL;
715         struct obd_uuid tgtuuid;
716         int rc;
717
718         obd_str2uuid(&tgtuuid, str);
719
720         read_lock(&obd_dev_lock);
721         rc = class_uuid2dev_nolock(&tgtuuid);
722         if (rc < 0)
723                 rc = class_name2dev_nolock(str);
724
725         if (rc >= 0)
726                 target = class_num2obd(rc);
727
728         if (target != NULL)
729                 class_incref(target, "find", current);
730         read_unlock(&obd_dev_lock);
731
732         RETURN(target);
733 }
734 EXPORT_SYMBOL(class_dev_by_str);
735
736 /**
737  * Get obd devices count. Device in any
738  *    state are counted
739  * \retval obd device count
740  */
741 int get_devices_count(void)
742 {
743         int index, max_index = class_devno_max(), dev_count = 0;
744
745         read_lock(&obd_dev_lock);
746         for (index = 0; index <= max_index; index++) {
747                 struct obd_device *obd = class_num2obd(index);
748                 if (obd != NULL)
749                         dev_count++;
750         }
751         read_unlock(&obd_dev_lock);
752
753         return dev_count;
754 }
755 EXPORT_SYMBOL(get_devices_count);
756
757 void class_obd_list(void)
758 {
759         char *status;
760         int i;
761
762         read_lock(&obd_dev_lock);
763         for (i = 0; i < class_devno_max(); i++) {
764                 struct obd_device *obd = class_num2obd(i);
765
766                 if (obd == NULL)
767                         continue;
768                 if (obd->obd_stopping)
769                         status = "ST";
770                 else if (obd->obd_set_up)
771                         status = "UP";
772                 else if (obd->obd_attached)
773                         status = "AT";
774                 else
775                         status = "--";
776                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
777                          i, status, obd->obd_type->typ_name,
778                          obd->obd_name, obd->obd_uuid.uuid,
779                          atomic_read(&obd->obd_refcount));
780         }
781         read_unlock(&obd_dev_lock);
782         return;
783 }
784
785 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
786    specified, then only the client with that uuid is returned,
787    otherwise any client connected to the tgt is returned. */
788 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
789                                           const char * typ_name,
790                                           struct obd_uuid *grp_uuid)
791 {
792         int i;
793
794         read_lock(&obd_dev_lock);
795         for (i = 0; i < class_devno_max(); i++) {
796                 struct obd_device *obd = class_num2obd(i);
797
798                 if (obd == NULL)
799                         continue;
800                 if ((strncmp(obd->obd_type->typ_name, typ_name,
801                              strlen(typ_name)) == 0)) {
802                         if (obd_uuid_equals(tgt_uuid,
803                                             &obd->u.cli.cl_target_uuid) &&
804                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
805                                                          &obd->obd_uuid) : 1)) {
806                                 read_unlock(&obd_dev_lock);
807                                 return obd;
808                         }
809                 }
810         }
811         read_unlock(&obd_dev_lock);
812
813         return NULL;
814 }
815 EXPORT_SYMBOL(class_find_client_obd);
816
817 /* Iterate the obd_device list looking devices have grp_uuid. Start
818    searching at *next, and if a device is found, the next index to look
819    at is saved in *next. If next is NULL, then the first matching device
820    will always be returned. */
821 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
822 {
823         int i;
824
825         if (next == NULL)
826                 i = 0;
827         else if (*next >= 0 && *next < class_devno_max())
828                 i = *next;
829         else
830                 return NULL;
831
832         read_lock(&obd_dev_lock);
833         for (; i < class_devno_max(); i++) {
834                 struct obd_device *obd = class_num2obd(i);
835
836                 if (obd == NULL)
837                         continue;
838                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
839                         if (next != NULL)
840                                 *next = i+1;
841                         read_unlock(&obd_dev_lock);
842                         return obd;
843                 }
844         }
845         read_unlock(&obd_dev_lock);
846
847         return NULL;
848 }
849 EXPORT_SYMBOL(class_devices_in_group);
850
851 /**
852  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
853  * adjust sptlrpc settings accordingly.
854  */
855 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
856 {
857         struct obd_device  *obd;
858         const char         *type;
859         int                 i, rc = 0, rc2;
860
861         LASSERT(namelen > 0);
862
863         read_lock(&obd_dev_lock);
864         for (i = 0; i < class_devno_max(); i++) {
865                 obd = class_num2obd(i);
866
867                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
868                         continue;
869
870                 /* only notify mdc, osc, osp, lwp, mdt, ost
871                  * because only these have a -sptlrpc llog */
872                 type = obd->obd_type->typ_name;
873                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
874                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
875                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
876                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
877                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
878                     strcmp(type, LUSTRE_OST_NAME) != 0)
879                         continue;
880
881                 if (strncmp(obd->obd_name, fsname, namelen))
882                         continue;
883
884                 class_incref(obd, __FUNCTION__, obd);
885                 read_unlock(&obd_dev_lock);
886                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
887                                          sizeof(KEY_SPTLRPC_CONF),
888                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
889                 rc = rc ? rc : rc2;
890                 class_decref(obd, __FUNCTION__, obd);
891                 read_lock(&obd_dev_lock);
892         }
893         read_unlock(&obd_dev_lock);
894         return rc;
895 }
896 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
897
898 void obd_cleanup_caches(void)
899 {
900         ENTRY;
901         if (obd_device_cachep) {
902                 kmem_cache_destroy(obd_device_cachep);
903                 obd_device_cachep = NULL;
904         }
905         if (obdo_cachep) {
906                 kmem_cache_destroy(obdo_cachep);
907                 obdo_cachep = NULL;
908         }
909
910         EXIT;
911 }
912
913 int obd_init_caches(void)
914 {
915         int rc;
916         ENTRY;
917
918         LASSERT(obd_device_cachep == NULL);
919         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
920                                               sizeof(struct obd_device),
921                                               0, 0, NULL);
922         if (!obd_device_cachep)
923                 GOTO(out, rc = -ENOMEM);
924
925         LASSERT(obdo_cachep == NULL);
926         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
927                                         0, 0, NULL);
928         if (!obdo_cachep)
929                 GOTO(out, rc = -ENOMEM);
930
931         RETURN(0);
932 out:
933         obd_cleanup_caches();
934         RETURN(rc);
935 }
936
937 /* map connection to client */
938 struct obd_export *class_conn2export(struct lustre_handle *conn)
939 {
940         struct obd_export *export;
941         ENTRY;
942
943         if (!conn) {
944                 CDEBUG(D_CACHE, "looking for null handle\n");
945                 RETURN(NULL);
946         }
947
948         if (conn->cookie == -1) {  /* this means assign a new connection */
949                 CDEBUG(D_CACHE, "want a new connection\n");
950                 RETURN(NULL);
951         }
952
953         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
954         export = class_handle2object(conn->cookie, NULL);
955         RETURN(export);
956 }
957 EXPORT_SYMBOL(class_conn2export);
958
959 struct obd_device *class_exp2obd(struct obd_export *exp)
960 {
961         if (exp)
962                 return exp->exp_obd;
963         return NULL;
964 }
965 EXPORT_SYMBOL(class_exp2obd);
966
967 struct obd_device *class_conn2obd(struct lustre_handle *conn)
968 {
969         struct obd_export *export;
970         export = class_conn2export(conn);
971         if (export) {
972                 struct obd_device *obd = export->exp_obd;
973                 class_export_put(export);
974                 return obd;
975         }
976         return NULL;
977 }
978
979 struct obd_import *class_exp2cliimp(struct obd_export *exp)
980 {
981         struct obd_device *obd = exp->exp_obd;
982         if (obd == NULL)
983                 return NULL;
984         return obd->u.cli.cl_import;
985 }
986 EXPORT_SYMBOL(class_exp2cliimp);
987
988 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
989 {
990         struct obd_device *obd = class_conn2obd(conn);
991         if (obd == NULL)
992                 return NULL;
993         return obd->u.cli.cl_import;
994 }
995
996 /* Export management functions */
997 static void class_export_destroy(struct obd_export *exp)
998 {
999         struct obd_device *obd = exp->exp_obd;
1000         ENTRY;
1001
1002         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1003         LASSERT(obd != NULL);
1004
1005         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1006                exp->exp_client_uuid.uuid, obd->obd_name);
1007
1008         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1009         if (exp->exp_connection)
1010                 ptlrpc_put_connection_superhack(exp->exp_connection);
1011
1012         LASSERT(list_empty(&exp->exp_outstanding_replies));
1013         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1014         LASSERT(list_empty(&exp->exp_req_replay_queue));
1015         LASSERT(list_empty(&exp->exp_hp_rpcs));
1016         obd_destroy_export(exp);
1017         /* self export doesn't hold a reference to an obd, although it
1018          * exists until freeing of the obd */
1019         if (exp != obd->obd_self_export)
1020                 class_decref(obd, "export", exp);
1021
1022         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1023         EXIT;
1024 }
1025
1026 static void export_handle_addref(void *export)
1027 {
1028         class_export_get(export);
1029 }
1030
1031 static struct portals_handle_ops export_handle_ops = {
1032         .hop_addref = export_handle_addref,
1033         .hop_free   = NULL,
1034 };
1035
1036 struct obd_export *class_export_get(struct obd_export *exp)
1037 {
1038         atomic_inc(&exp->exp_refcount);
1039         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1040                atomic_read(&exp->exp_refcount));
1041         return exp;
1042 }
1043 EXPORT_SYMBOL(class_export_get);
1044
1045 void class_export_put(struct obd_export *exp)
1046 {
1047         LASSERT(exp != NULL);
1048         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1049         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1050                atomic_read(&exp->exp_refcount) - 1);
1051
1052         if (atomic_dec_and_test(&exp->exp_refcount)) {
1053                 struct obd_device *obd = exp->exp_obd;
1054
1055                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1056                        exp, exp->exp_client_uuid.uuid);
1057
1058                 /* release nid stat refererence */
1059                 lprocfs_exp_cleanup(exp);
1060
1061                 if (exp == obd->obd_self_export) {
1062                         /* self export should be destroyed without
1063                          * zombie thread as it doesn't hold a
1064                          * reference to obd and doesn't hold any
1065                          * resources */
1066                         class_export_destroy(exp);
1067                         /* self export is destroyed, no class
1068                          * references exist and it is safe to free
1069                          * obd */
1070                         class_free_dev(obd);
1071                 } else {
1072                         LASSERT(!list_empty(&exp->exp_obd_chain));
1073                         obd_zombie_export_add(exp);
1074                 }
1075
1076         }
1077 }
1078 EXPORT_SYMBOL(class_export_put);
1079
1080 static void obd_zombie_exp_cull(struct work_struct *ws)
1081 {
1082         struct obd_export *export;
1083
1084         export = container_of(ws, struct obd_export, exp_zombie_work);
1085         class_export_destroy(export);
1086 }
1087
1088 /* Creates a new export, adds it to the hash table, and returns a
1089  * pointer to it. The refcount is 2: one for the hash reference, and
1090  * one for the pointer returned by this function. */
1091 struct obd_export *__class_new_export(struct obd_device *obd,
1092                                       struct obd_uuid *cluuid, bool is_self)
1093 {
1094         struct obd_export *export;
1095         struct cfs_hash *hash = NULL;
1096         int rc = 0;
1097         ENTRY;
1098
1099         OBD_ALLOC_PTR(export);
1100         if (!export)
1101                 return ERR_PTR(-ENOMEM);
1102
1103         export->exp_conn_cnt = 0;
1104         export->exp_lock_hash = NULL;
1105         export->exp_flock_hash = NULL;
1106         /* 2 = class_handle_hash + last */
1107         atomic_set(&export->exp_refcount, 2);
1108         atomic_set(&export->exp_rpc_count, 0);
1109         atomic_set(&export->exp_cb_count, 0);
1110         atomic_set(&export->exp_locks_count, 0);
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1112         INIT_LIST_HEAD(&export->exp_locks_list);
1113         spin_lock_init(&export->exp_locks_list_guard);
1114 #endif
1115         atomic_set(&export->exp_replay_count, 0);
1116         export->exp_obd = obd;
1117         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1118         spin_lock_init(&export->exp_uncommitted_replies_lock);
1119         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1120         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1121         INIT_LIST_HEAD(&export->exp_handle.h_link);
1122         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1123         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1124         class_handle_hash(&export->exp_handle, &export_handle_ops);
1125         export->exp_last_request_time = ktime_get_real_seconds();
1126         spin_lock_init(&export->exp_lock);
1127         spin_lock_init(&export->exp_rpc_lock);
1128         INIT_HLIST_NODE(&export->exp_uuid_hash);
1129         INIT_HLIST_NODE(&export->exp_nid_hash);
1130         INIT_HLIST_NODE(&export->exp_gen_hash);
1131         spin_lock_init(&export->exp_bl_list_lock);
1132         INIT_LIST_HEAD(&export->exp_bl_list);
1133         INIT_LIST_HEAD(&export->exp_stale_list);
1134         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1135
1136         export->exp_sp_peer = LUSTRE_SP_ANY;
1137         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1138         export->exp_client_uuid = *cluuid;
1139         obd_init_export(export);
1140
1141         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1142                 spin_lock(&obd->obd_dev_lock);
1143                 /* shouldn't happen, but might race */
1144                 if (obd->obd_stopping)
1145                         GOTO(exit_unlock, rc = -ENODEV);
1146
1147                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1148                 if (hash == NULL)
1149                         GOTO(exit_unlock, rc = -ENODEV);
1150                 spin_unlock(&obd->obd_dev_lock);
1151
1152                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1153                 if (rc != 0) {
1154                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1155                                       obd->obd_name, cluuid->uuid, rc);
1156                         GOTO(exit_err, rc = -EALREADY);
1157                 }
1158         }
1159
1160         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1161         spin_lock(&obd->obd_dev_lock);
1162         if (obd->obd_stopping) {
1163                 if (hash)
1164                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1165                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1166         }
1167
1168         if (!is_self) {
1169                 class_incref(obd, "export", export);
1170                 list_add_tail(&export->exp_obd_chain_timed,
1171                               &obd->obd_exports_timed);
1172                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1173                 obd->obd_num_exports++;
1174         } else {
1175                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1176                 INIT_LIST_HEAD(&export->exp_obd_chain);
1177         }
1178         spin_unlock(&obd->obd_dev_lock);
1179         if (hash)
1180                 cfs_hash_putref(hash);
1181         RETURN(export);
1182
1183 exit_unlock:
1184         spin_unlock(&obd->obd_dev_lock);
1185 exit_err:
1186         if (hash)
1187                 cfs_hash_putref(hash);
1188         class_handle_unhash(&export->exp_handle);
1189         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1190         obd_destroy_export(export);
1191         OBD_FREE_PTR(export);
1192         return ERR_PTR(rc);
1193 }
1194
1195 struct obd_export *class_new_export(struct obd_device *obd,
1196                                     struct obd_uuid *uuid)
1197 {
1198         return __class_new_export(obd, uuid, false);
1199 }
1200 EXPORT_SYMBOL(class_new_export);
1201
1202 struct obd_export *class_new_export_self(struct obd_device *obd,
1203                                          struct obd_uuid *uuid)
1204 {
1205         return __class_new_export(obd, uuid, true);
1206 }
1207
1208 void class_unlink_export(struct obd_export *exp)
1209 {
1210         class_handle_unhash(&exp->exp_handle);
1211
1212         if (exp->exp_obd->obd_self_export == exp) {
1213                 class_export_put(exp);
1214                 return;
1215         }
1216
1217         spin_lock(&exp->exp_obd->obd_dev_lock);
1218         /* delete an uuid-export hashitem from hashtables */
1219         if (!hlist_unhashed(&exp->exp_uuid_hash))
1220                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1221                              &exp->exp_client_uuid,
1222                              &exp->exp_uuid_hash);
1223
1224 #ifdef HAVE_SERVER_SUPPORT
1225         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1226                 struct tg_export_data   *ted = &exp->exp_target_data;
1227                 struct cfs_hash         *hash;
1228
1229                 /* Because obd_gen_hash will not be released until
1230                  * class_cleanup(), so hash should never be NULL here */
1231                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1232                 LASSERT(hash != NULL);
1233                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1234                              &exp->exp_gen_hash);
1235                 cfs_hash_putref(hash);
1236         }
1237 #endif /* HAVE_SERVER_SUPPORT */
1238
1239         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1240         list_del_init(&exp->exp_obd_chain_timed);
1241         exp->exp_obd->obd_num_exports--;
1242         spin_unlock(&exp->exp_obd->obd_dev_lock);
1243         atomic_inc(&obd_stale_export_num);
1244
1245         /* A reference is kept by obd_stale_exports list */
1246         obd_stale_export_put(exp);
1247 }
1248 EXPORT_SYMBOL(class_unlink_export);
1249
1250 /* Import management functions */
1251 static void class_import_destroy(struct obd_import *imp)
1252 {
1253         ENTRY;
1254
1255         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1256                 imp->imp_obd->obd_name);
1257
1258         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1259
1260         ptlrpc_put_connection_superhack(imp->imp_connection);
1261
1262         while (!list_empty(&imp->imp_conn_list)) {
1263                 struct obd_import_conn *imp_conn;
1264
1265                 imp_conn = list_entry(imp->imp_conn_list.next,
1266                                       struct obd_import_conn, oic_item);
1267                 list_del_init(&imp_conn->oic_item);
1268                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1269                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1270         }
1271
1272         LASSERT(imp->imp_sec == NULL);
1273         class_decref(imp->imp_obd, "import", imp);
1274         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1275         EXIT;
1276 }
1277
1278 static void import_handle_addref(void *import)
1279 {
1280         class_import_get(import);
1281 }
1282
1283 static struct portals_handle_ops import_handle_ops = {
1284         .hop_addref = import_handle_addref,
1285         .hop_free   = NULL,
1286 };
1287
1288 struct obd_import *class_import_get(struct obd_import *import)
1289 {
1290         atomic_inc(&import->imp_refcount);
1291         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1292                atomic_read(&import->imp_refcount),
1293                import->imp_obd->obd_name);
1294         return import;
1295 }
1296 EXPORT_SYMBOL(class_import_get);
1297
1298 void class_import_put(struct obd_import *imp)
1299 {
1300         ENTRY;
1301
1302         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1303
1304         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1305                atomic_read(&imp->imp_refcount) - 1,
1306                imp->imp_obd->obd_name);
1307
1308         if (atomic_dec_and_test(&imp->imp_refcount)) {
1309                 CDEBUG(D_INFO, "final put import %p\n", imp);
1310                 obd_zombie_import_add(imp);
1311         }
1312
1313         /* catch possible import put race */
1314         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1315         EXIT;
1316 }
1317 EXPORT_SYMBOL(class_import_put);
1318
1319 static void init_imp_at(struct imp_at *at) {
1320         int i;
1321         at_init(&at->iat_net_latency, 0, 0);
1322         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1323                 /* max service estimates are tracked on the server side, so
1324                    don't use the AT history here, just use the last reported
1325                    val. (But keep hist for proc histogram, worst_ever) */
1326                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1327                         AT_FLG_NOHIST);
1328         }
1329 }
1330
1331 static void obd_zombie_imp_cull(struct work_struct *ws)
1332 {
1333         struct obd_import *import;
1334
1335         import = container_of(ws, struct obd_import, imp_zombie_work);
1336         class_import_destroy(import);
1337 }
1338
1339 struct obd_import *class_new_import(struct obd_device *obd)
1340 {
1341         struct obd_import *imp;
1342         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1343
1344         OBD_ALLOC(imp, sizeof(*imp));
1345         if (imp == NULL)
1346                 return NULL;
1347
1348         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1349         INIT_LIST_HEAD(&imp->imp_replay_list);
1350         INIT_LIST_HEAD(&imp->imp_sending_list);
1351         INIT_LIST_HEAD(&imp->imp_delayed_list);
1352         INIT_LIST_HEAD(&imp->imp_committed_list);
1353         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1354         imp->imp_known_replied_xid = 0;
1355         imp->imp_replay_cursor = &imp->imp_committed_list;
1356         spin_lock_init(&imp->imp_lock);
1357         imp->imp_last_success_conn = 0;
1358         imp->imp_state = LUSTRE_IMP_NEW;
1359         imp->imp_obd = class_incref(obd, "import", imp);
1360         mutex_init(&imp->imp_sec_mutex);
1361         init_waitqueue_head(&imp->imp_recovery_waitq);
1362         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1363
1364         if (curr_pid_ns->child_reaper)
1365                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1366         else
1367                 imp->imp_sec_refpid = 1;
1368
1369         atomic_set(&imp->imp_refcount, 2);
1370         atomic_set(&imp->imp_unregistering, 0);
1371         atomic_set(&imp->imp_inflight, 0);
1372         atomic_set(&imp->imp_replay_inflight, 0);
1373         atomic_set(&imp->imp_inval_count, 0);
1374         INIT_LIST_HEAD(&imp->imp_conn_list);
1375         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1376         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1377         init_imp_at(&imp->imp_at);
1378
1379         /* the default magic is V2, will be used in connect RPC, and
1380          * then adjusted according to the flags in request/reply. */
1381         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1382
1383         return imp;
1384 }
1385 EXPORT_SYMBOL(class_new_import);
1386
1387 void class_destroy_import(struct obd_import *import)
1388 {
1389         LASSERT(import != NULL);
1390         LASSERT(import != LP_POISON);
1391
1392         class_handle_unhash(&import->imp_handle);
1393
1394         spin_lock(&import->imp_lock);
1395         import->imp_generation++;
1396         spin_unlock(&import->imp_lock);
1397         class_import_put(import);
1398 }
1399 EXPORT_SYMBOL(class_destroy_import);
1400
1401 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1402
1403 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1404 {
1405         spin_lock(&exp->exp_locks_list_guard);
1406
1407         LASSERT(lock->l_exp_refs_nr >= 0);
1408
1409         if (lock->l_exp_refs_target != NULL &&
1410             lock->l_exp_refs_target != exp) {
1411                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1412                               exp, lock, lock->l_exp_refs_target);
1413         }
1414         if ((lock->l_exp_refs_nr ++) == 0) {
1415                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1416                 lock->l_exp_refs_target = exp;
1417         }
1418         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1419                lock, exp, lock->l_exp_refs_nr);
1420         spin_unlock(&exp->exp_locks_list_guard);
1421 }
1422 EXPORT_SYMBOL(__class_export_add_lock_ref);
1423
1424 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1425 {
1426         spin_lock(&exp->exp_locks_list_guard);
1427         LASSERT(lock->l_exp_refs_nr > 0);
1428         if (lock->l_exp_refs_target != exp) {
1429                 LCONSOLE_WARN("lock %p, "
1430                               "mismatching export pointers: %p, %p\n",
1431                               lock, lock->l_exp_refs_target, exp);
1432         }
1433         if (-- lock->l_exp_refs_nr == 0) {
1434                 list_del_init(&lock->l_exp_refs_link);
1435                 lock->l_exp_refs_target = NULL;
1436         }
1437         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1438                lock, exp, lock->l_exp_refs_nr);
1439         spin_unlock(&exp->exp_locks_list_guard);
1440 }
1441 EXPORT_SYMBOL(__class_export_del_lock_ref);
1442 #endif
1443
1444 /* A connection defines an export context in which preallocation can
1445    be managed. This releases the export pointer reference, and returns
1446    the export handle, so the export refcount is 1 when this function
1447    returns. */
1448 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1449                   struct obd_uuid *cluuid)
1450 {
1451         struct obd_export *export;
1452         LASSERT(conn != NULL);
1453         LASSERT(obd != NULL);
1454         LASSERT(cluuid != NULL);
1455         ENTRY;
1456
1457         export = class_new_export(obd, cluuid);
1458         if (IS_ERR(export))
1459                 RETURN(PTR_ERR(export));
1460
1461         conn->cookie = export->exp_handle.h_cookie;
1462         class_export_put(export);
1463
1464         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1465                cluuid->uuid, conn->cookie);
1466         RETURN(0);
1467 }
1468 EXPORT_SYMBOL(class_connect);
1469
1470 /* if export is involved in recovery then clean up related things */
1471 static void class_export_recovery_cleanup(struct obd_export *exp)
1472 {
1473         struct obd_device *obd = exp->exp_obd;
1474
1475         spin_lock(&obd->obd_recovery_task_lock);
1476         if (obd->obd_recovering) {
1477                 if (exp->exp_in_recovery) {
1478                         spin_lock(&exp->exp_lock);
1479                         exp->exp_in_recovery = 0;
1480                         spin_unlock(&exp->exp_lock);
1481                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1482                         atomic_dec(&obd->obd_connected_clients);
1483                 }
1484
1485                 /* if called during recovery then should update
1486                  * obd_stale_clients counter,
1487                  * lightweight exports are not counted */
1488                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1489                         exp->exp_obd->obd_stale_clients++;
1490         }
1491         spin_unlock(&obd->obd_recovery_task_lock);
1492
1493         spin_lock(&exp->exp_lock);
1494         /** Cleanup req replay fields */
1495         if (exp->exp_req_replay_needed) {
1496                 exp->exp_req_replay_needed = 0;
1497
1498                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1499                 atomic_dec(&obd->obd_req_replay_clients);
1500         }
1501
1502         /** Cleanup lock replay data */
1503         if (exp->exp_lock_replay_needed) {
1504                 exp->exp_lock_replay_needed = 0;
1505
1506                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1507                 atomic_dec(&obd->obd_lock_replay_clients);
1508         }
1509         spin_unlock(&exp->exp_lock);
1510 }
1511
1512 /* This function removes 1-3 references from the export:
1513  * 1 - for export pointer passed
1514  * and if disconnect really need
1515  * 2 - removing from hash
1516  * 3 - in client_unlink_export
1517  * The export pointer passed to this function can destroyed */
1518 int class_disconnect(struct obd_export *export)
1519 {
1520         int already_disconnected;
1521         ENTRY;
1522
1523         if (export == NULL) {
1524                 CWARN("attempting to free NULL export %p\n", export);
1525                 RETURN(-EINVAL);
1526         }
1527
1528         spin_lock(&export->exp_lock);
1529         already_disconnected = export->exp_disconnected;
1530         export->exp_disconnected = 1;
1531         /*  We hold references of export for uuid hash
1532          *  and nid_hash and export link at least. So
1533          *  it is safe to call cfs_hash_del in there.  */
1534         if (!hlist_unhashed(&export->exp_nid_hash))
1535                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1536                              &export->exp_connection->c_peer.nid,
1537                              &export->exp_nid_hash);
1538         spin_unlock(&export->exp_lock);
1539
1540         /* class_cleanup(), abort_recovery(), and class_fail_export()
1541          * all end up in here, and if any of them race we shouldn't
1542          * call extra class_export_puts(). */
1543         if (already_disconnected) {
1544                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1545                 GOTO(no_disconn, already_disconnected);
1546         }
1547
1548         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1549                export->exp_handle.h_cookie);
1550
1551         class_export_recovery_cleanup(export);
1552         class_unlink_export(export);
1553 no_disconn:
1554         class_export_put(export);
1555         RETURN(0);
1556 }
1557 EXPORT_SYMBOL(class_disconnect);
1558
1559 /* Return non-zero for a fully connected export */
1560 int class_connected_export(struct obd_export *exp)
1561 {
1562         int connected = 0;
1563
1564         if (exp) {
1565                 spin_lock(&exp->exp_lock);
1566                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1567                 spin_unlock(&exp->exp_lock);
1568         }
1569         return connected;
1570 }
1571 EXPORT_SYMBOL(class_connected_export);
1572
1573 static void class_disconnect_export_list(struct list_head *list,
1574                                          enum obd_option flags)
1575 {
1576         int rc;
1577         struct obd_export *exp;
1578         ENTRY;
1579
1580         /* It's possible that an export may disconnect itself, but
1581          * nothing else will be added to this list. */
1582         while (!list_empty(list)) {
1583                 exp = list_entry(list->next, struct obd_export,
1584                                  exp_obd_chain);
1585                 /* need for safe call CDEBUG after obd_disconnect */
1586                 class_export_get(exp);
1587
1588                 spin_lock(&exp->exp_lock);
1589                 exp->exp_flags = flags;
1590                 spin_unlock(&exp->exp_lock);
1591
1592                 if (obd_uuid_equals(&exp->exp_client_uuid,
1593                                     &exp->exp_obd->obd_uuid)) {
1594                         CDEBUG(D_HA,
1595                                "exp %p export uuid == obd uuid, don't discon\n",
1596                                exp);
1597                         /* Need to delete this now so we don't end up pointing
1598                          * to work_list later when this export is cleaned up. */
1599                         list_del_init(&exp->exp_obd_chain);
1600                         class_export_put(exp);
1601                         continue;
1602                 }
1603
1604                 class_export_get(exp);
1605                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1606                        "last request at %lld\n",
1607                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1608                        exp, exp->exp_last_request_time);
1609                 /* release one export reference anyway */
1610                 rc = obd_disconnect(exp);
1611
1612                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1613                        obd_export_nid2str(exp), exp, rc);
1614                 class_export_put(exp);
1615         }
1616         EXIT;
1617 }
1618
1619 void class_disconnect_exports(struct obd_device *obd)
1620 {
1621         struct list_head work_list;
1622         ENTRY;
1623
1624         /* Move all of the exports from obd_exports to a work list, en masse. */
1625         INIT_LIST_HEAD(&work_list);
1626         spin_lock(&obd->obd_dev_lock);
1627         list_splice_init(&obd->obd_exports, &work_list);
1628         list_splice_init(&obd->obd_delayed_exports, &work_list);
1629         spin_unlock(&obd->obd_dev_lock);
1630
1631         if (!list_empty(&work_list)) {
1632                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1633                        "disconnecting them\n", obd->obd_minor, obd);
1634                 class_disconnect_export_list(&work_list,
1635                                              exp_flags_from_obd(obd));
1636         } else
1637                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1638                        obd->obd_minor, obd);
1639         EXIT;
1640 }
1641 EXPORT_SYMBOL(class_disconnect_exports);
1642
1643 /* Remove exports that have not completed recovery.
1644  */
1645 void class_disconnect_stale_exports(struct obd_device *obd,
1646                                     int (*test_export)(struct obd_export *))
1647 {
1648         struct list_head work_list;
1649         struct obd_export *exp, *n;
1650         int evicted = 0;
1651         ENTRY;
1652
1653         INIT_LIST_HEAD(&work_list);
1654         spin_lock(&obd->obd_dev_lock);
1655         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1656                                  exp_obd_chain) {
1657                 /* don't count self-export as client */
1658                 if (obd_uuid_equals(&exp->exp_client_uuid,
1659                                     &exp->exp_obd->obd_uuid))
1660                         continue;
1661
1662                 /* don't evict clients which have no slot in last_rcvd
1663                  * (e.g. lightweight connection) */
1664                 if (exp->exp_target_data.ted_lr_idx == -1)
1665                         continue;
1666
1667                 spin_lock(&exp->exp_lock);
1668                 if (exp->exp_failed || test_export(exp)) {
1669                         spin_unlock(&exp->exp_lock);
1670                         continue;
1671                 }
1672                 exp->exp_failed = 1;
1673                 spin_unlock(&exp->exp_lock);
1674
1675                 list_move(&exp->exp_obd_chain, &work_list);
1676                 evicted++;
1677                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1678                        obd->obd_name, exp->exp_client_uuid.uuid,
1679                        obd_export_nid2str(exp));
1680                 print_export_data(exp, "EVICTING", 0, D_HA);
1681         }
1682         spin_unlock(&obd->obd_dev_lock);
1683
1684         if (evicted)
1685                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1686                               obd->obd_name, evicted);
1687
1688         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1689                                                  OBD_OPT_ABORT_RECOV);
1690         EXIT;
1691 }
1692 EXPORT_SYMBOL(class_disconnect_stale_exports);
1693
1694 void class_fail_export(struct obd_export *exp)
1695 {
1696         int rc, already_failed;
1697
1698         spin_lock(&exp->exp_lock);
1699         already_failed = exp->exp_failed;
1700         exp->exp_failed = 1;
1701         spin_unlock(&exp->exp_lock);
1702
1703         if (already_failed) {
1704                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1705                        exp, exp->exp_client_uuid.uuid);
1706                 return;
1707         }
1708
1709         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1710                exp, exp->exp_client_uuid.uuid);
1711
1712         if (obd_dump_on_timeout)
1713                 libcfs_debug_dumplog();
1714
1715         /* need for safe call CDEBUG after obd_disconnect */
1716         class_export_get(exp);
1717
1718         /* Most callers into obd_disconnect are removing their own reference
1719          * (request, for example) in addition to the one from the hash table.
1720          * We don't have such a reference here, so make one. */
1721         class_export_get(exp);
1722         rc = obd_disconnect(exp);
1723         if (rc)
1724                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1725         else
1726                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1727                        exp, exp->exp_client_uuid.uuid);
1728         class_export_put(exp);
1729 }
1730 EXPORT_SYMBOL(class_fail_export);
1731
1732 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1733 {
1734         struct cfs_hash *nid_hash;
1735         struct obd_export *doomed_exp = NULL;
1736         int exports_evicted = 0;
1737
1738         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1739
1740         spin_lock(&obd->obd_dev_lock);
1741         /* umount has run already, so evict thread should leave
1742          * its task to umount thread now */
1743         if (obd->obd_stopping) {
1744                 spin_unlock(&obd->obd_dev_lock);
1745                 return exports_evicted;
1746         }
1747         nid_hash = obd->obd_nid_hash;
1748         cfs_hash_getref(nid_hash);
1749         spin_unlock(&obd->obd_dev_lock);
1750
1751         do {
1752                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1753                 if (doomed_exp == NULL)
1754                         break;
1755
1756                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1757                          "nid %s found, wanted nid %s, requested nid %s\n",
1758                          obd_export_nid2str(doomed_exp),
1759                          libcfs_nid2str(nid_key), nid);
1760                 LASSERTF(doomed_exp != obd->obd_self_export,
1761                          "self-export is hashed by NID?\n");
1762                 exports_evicted++;
1763                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1764                               "request\n", obd->obd_name,
1765                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1766                               obd_export_nid2str(doomed_exp));
1767                 class_fail_export(doomed_exp);
1768                 class_export_put(doomed_exp);
1769         } while (1);
1770
1771         cfs_hash_putref(nid_hash);
1772
1773         if (!exports_evicted)
1774                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1775                        obd->obd_name, nid);
1776         return exports_evicted;
1777 }
1778 EXPORT_SYMBOL(obd_export_evict_by_nid);
1779
1780 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1781 {
1782         struct cfs_hash *uuid_hash;
1783         struct obd_export *doomed_exp = NULL;
1784         struct obd_uuid doomed_uuid;
1785         int exports_evicted = 0;
1786
1787         spin_lock(&obd->obd_dev_lock);
1788         if (obd->obd_stopping) {
1789                 spin_unlock(&obd->obd_dev_lock);
1790                 return exports_evicted;
1791         }
1792         uuid_hash = obd->obd_uuid_hash;
1793         cfs_hash_getref(uuid_hash);
1794         spin_unlock(&obd->obd_dev_lock);
1795
1796         obd_str2uuid(&doomed_uuid, uuid);
1797         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1798                 CERROR("%s: can't evict myself\n", obd->obd_name);
1799                 cfs_hash_putref(uuid_hash);
1800                 return exports_evicted;
1801         }
1802
1803         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1804
1805         if (doomed_exp == NULL) {
1806                 CERROR("%s: can't disconnect %s: no exports found\n",
1807                        obd->obd_name, uuid);
1808         } else {
1809                 CWARN("%s: evicting %s at adminstrative request\n",
1810                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1811                 class_fail_export(doomed_exp);
1812                 class_export_put(doomed_exp);
1813                 exports_evicted++;
1814         }
1815         cfs_hash_putref(uuid_hash);
1816
1817         return exports_evicted;
1818 }
1819
1820 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1821 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1822 EXPORT_SYMBOL(class_export_dump_hook);
1823 #endif
1824
1825 static void print_export_data(struct obd_export *exp, const char *status,
1826                               int locks, int debug_level)
1827 {
1828         struct ptlrpc_reply_state *rs;
1829         struct ptlrpc_reply_state *first_reply = NULL;
1830         int nreplies = 0;
1831
1832         spin_lock(&exp->exp_lock);
1833         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1834                             rs_exp_list) {
1835                 if (nreplies == 0)
1836                         first_reply = rs;
1837                 nreplies++;
1838         }
1839         spin_unlock(&exp->exp_lock);
1840
1841         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1842                "%p %s %llu stale:%d\n",
1843                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1844                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1845                atomic_read(&exp->exp_rpc_count),
1846                atomic_read(&exp->exp_cb_count),
1847                atomic_read(&exp->exp_locks_count),
1848                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1849                nreplies, first_reply, nreplies > 3 ? "..." : "",
1850                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1852         if (locks && class_export_dump_hook != NULL)
1853                 class_export_dump_hook(exp);
1854 #endif
1855 }
1856
1857 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1858 {
1859         struct obd_export *exp;
1860
1861         spin_lock(&obd->obd_dev_lock);
1862         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1863                 print_export_data(exp, "ACTIVE", locks, debug_level);
1864         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1865                 print_export_data(exp, "UNLINKED", locks, debug_level);
1866         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1867                 print_export_data(exp, "DELAYED", locks, debug_level);
1868         spin_unlock(&obd->obd_dev_lock);
1869 }
1870
1871 void obd_exports_barrier(struct obd_device *obd)
1872 {
1873         int waited = 2;
1874         LASSERT(list_empty(&obd->obd_exports));
1875         spin_lock(&obd->obd_dev_lock);
1876         while (!list_empty(&obd->obd_unlinked_exports)) {
1877                 spin_unlock(&obd->obd_dev_lock);
1878                 set_current_state(TASK_UNINTERRUPTIBLE);
1879                 schedule_timeout(cfs_time_seconds(waited));
1880                 if (waited > 5 && is_power_of_2(waited)) {
1881                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1882                                       "more than %d seconds. "
1883                                       "The obd refcount = %d. Is it stuck?\n",
1884                                       obd->obd_name, waited,
1885                                       atomic_read(&obd->obd_refcount));
1886                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1887                 }
1888                 waited *= 2;
1889                 spin_lock(&obd->obd_dev_lock);
1890         }
1891         spin_unlock(&obd->obd_dev_lock);
1892 }
1893 EXPORT_SYMBOL(obd_exports_barrier);
1894
1895 /**
1896  * Add export to the obd_zombe thread and notify it.
1897  */
1898 static void obd_zombie_export_add(struct obd_export *exp) {
1899         atomic_dec(&obd_stale_export_num);
1900         spin_lock(&exp->exp_obd->obd_dev_lock);
1901         LASSERT(!list_empty(&exp->exp_obd_chain));
1902         list_del_init(&exp->exp_obd_chain);
1903         spin_unlock(&exp->exp_obd->obd_dev_lock);
1904
1905         queue_work(zombie_wq, &exp->exp_zombie_work);
1906 }
1907
1908 /**
1909  * Add import to the obd_zombe thread and notify it.
1910  */
1911 static void obd_zombie_import_add(struct obd_import *imp) {
1912         LASSERT(imp->imp_sec == NULL);
1913
1914         queue_work(zombie_wq, &imp->imp_zombie_work);
1915 }
1916
1917 /**
1918  * wait when obd_zombie import/export queues become empty
1919  */
1920 void obd_zombie_barrier(void)
1921 {
1922         flush_workqueue(zombie_wq);
1923 }
1924 EXPORT_SYMBOL(obd_zombie_barrier);
1925
1926
1927 struct obd_export *obd_stale_export_get(void)
1928 {
1929         struct obd_export *exp = NULL;
1930         ENTRY;
1931
1932         spin_lock(&obd_stale_export_lock);
1933         if (!list_empty(&obd_stale_exports)) {
1934                 exp = list_entry(obd_stale_exports.next,
1935                                  struct obd_export, exp_stale_list);
1936                 list_del_init(&exp->exp_stale_list);
1937         }
1938         spin_unlock(&obd_stale_export_lock);
1939
1940         if (exp) {
1941                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1942                        atomic_read(&obd_stale_export_num));
1943         }
1944         RETURN(exp);
1945 }
1946 EXPORT_SYMBOL(obd_stale_export_get);
1947
1948 void obd_stale_export_put(struct obd_export *exp)
1949 {
1950         ENTRY;
1951
1952         LASSERT(list_empty(&exp->exp_stale_list));
1953         if (exp->exp_lock_hash &&
1954             atomic_read(&exp->exp_lock_hash->hs_count)) {
1955                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1956                        atomic_read(&obd_stale_export_num));
1957
1958                 spin_lock_bh(&exp->exp_bl_list_lock);
1959                 spin_lock(&obd_stale_export_lock);
1960                 /* Add to the tail if there is no blocked locks,
1961                  * to the head otherwise. */
1962                 if (list_empty(&exp->exp_bl_list))
1963                         list_add_tail(&exp->exp_stale_list,
1964                                       &obd_stale_exports);
1965                 else
1966                         list_add(&exp->exp_stale_list,
1967                                  &obd_stale_exports);
1968
1969                 spin_unlock(&obd_stale_export_lock);
1970                 spin_unlock_bh(&exp->exp_bl_list_lock);
1971         } else {
1972                 class_export_put(exp);
1973         }
1974         EXIT;
1975 }
1976 EXPORT_SYMBOL(obd_stale_export_put);
1977
1978 /**
1979  * Adjust the position of the export in the stale list,
1980  * i.e. move to the head of the list if is needed.
1981  **/
1982 void obd_stale_export_adjust(struct obd_export *exp)
1983 {
1984         LASSERT(exp != NULL);
1985         spin_lock_bh(&exp->exp_bl_list_lock);
1986         spin_lock(&obd_stale_export_lock);
1987
1988         if (!list_empty(&exp->exp_stale_list) &&
1989             !list_empty(&exp->exp_bl_list))
1990                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1991
1992         spin_unlock(&obd_stale_export_lock);
1993         spin_unlock_bh(&exp->exp_bl_list_lock);
1994 }
1995 EXPORT_SYMBOL(obd_stale_export_adjust);
1996
1997 /**
1998  * start destroy zombie import/export thread
1999  */
2000 int obd_zombie_impexp_init(void)
2001 {
2002         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2003         if (!zombie_wq)
2004                 return -ENOMEM;
2005
2006         return 0;
2007 }
2008
2009 /**
2010  * stop destroy zombie import/export thread
2011  */
2012 void obd_zombie_impexp_stop(void)
2013 {
2014         destroy_workqueue(zombie_wq);
2015         LASSERT(list_empty(&obd_stale_exports));
2016 }
2017
2018 /***** Kernel-userspace comm helpers *******/
2019
2020 /* Get length of entire message, including header */
2021 int kuc_len(int payload_len)
2022 {
2023         return sizeof(struct kuc_hdr) + payload_len;
2024 }
2025 EXPORT_SYMBOL(kuc_len);
2026
2027 /* Get a pointer to kuc header, given a ptr to the payload
2028  * @param p Pointer to payload area
2029  * @returns Pointer to kuc header
2030  */
2031 struct kuc_hdr * kuc_ptr(void *p)
2032 {
2033         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2034         LASSERT(lh->kuc_magic == KUC_MAGIC);
2035         return lh;
2036 }
2037 EXPORT_SYMBOL(kuc_ptr);
2038
2039 /* Alloc space for a message, and fill in header
2040  * @return Pointer to payload area
2041  */
2042 void *kuc_alloc(int payload_len, int transport, int type)
2043 {
2044         struct kuc_hdr *lh;
2045         int len = kuc_len(payload_len);
2046
2047         OBD_ALLOC(lh, len);
2048         if (lh == NULL)
2049                 return ERR_PTR(-ENOMEM);
2050
2051         lh->kuc_magic = KUC_MAGIC;
2052         lh->kuc_transport = transport;
2053         lh->kuc_msgtype = type;
2054         lh->kuc_msglen = len;
2055
2056         return (void *)(lh + 1);
2057 }
2058 EXPORT_SYMBOL(kuc_alloc);
2059
2060 /* Takes pointer to payload area */
2061 void kuc_free(void *p, int payload_len)
2062 {
2063         struct kuc_hdr *lh = kuc_ptr(p);
2064         OBD_FREE(lh, kuc_len(payload_len));
2065 }
2066 EXPORT_SYMBOL(kuc_free);
2067
2068 struct obd_request_slot_waiter {
2069         struct list_head        orsw_entry;
2070         wait_queue_head_t       orsw_waitq;
2071         bool                    orsw_signaled;
2072 };
2073
2074 static bool obd_request_slot_avail(struct client_obd *cli,
2075                                    struct obd_request_slot_waiter *orsw)
2076 {
2077         bool avail;
2078
2079         spin_lock(&cli->cl_loi_list_lock);
2080         avail = !!list_empty(&orsw->orsw_entry);
2081         spin_unlock(&cli->cl_loi_list_lock);
2082
2083         return avail;
2084 };
2085
2086 /*
2087  * For network flow control, the RPC sponsor needs to acquire a credit
2088  * before sending the RPC. The credits count for a connection is defined
2089  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2090  * the subsequent RPC sponsors need to wait until others released their
2091  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2092  */
2093 int obd_get_request_slot(struct client_obd *cli)
2094 {
2095         struct obd_request_slot_waiter   orsw;
2096         struct l_wait_info               lwi;
2097         int                              rc;
2098
2099         spin_lock(&cli->cl_loi_list_lock);
2100         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2101                 cli->cl_rpcs_in_flight++;
2102                 spin_unlock(&cli->cl_loi_list_lock);
2103                 return 0;
2104         }
2105
2106         init_waitqueue_head(&orsw.orsw_waitq);
2107         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2108         orsw.orsw_signaled = false;
2109         spin_unlock(&cli->cl_loi_list_lock);
2110
2111         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2112         rc = l_wait_event(orsw.orsw_waitq,
2113                           obd_request_slot_avail(cli, &orsw) ||
2114                           orsw.orsw_signaled,
2115                           &lwi);
2116
2117         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2118          * freed but other (such as obd_put_request_slot) is using it. */
2119         spin_lock(&cli->cl_loi_list_lock);
2120         if (rc != 0) {
2121                 if (!orsw.orsw_signaled) {
2122                         if (list_empty(&orsw.orsw_entry))
2123                                 cli->cl_rpcs_in_flight--;
2124                         else
2125                                 list_del(&orsw.orsw_entry);
2126                 }
2127         }
2128
2129         if (orsw.orsw_signaled) {
2130                 LASSERT(list_empty(&orsw.orsw_entry));
2131
2132                 rc = -EINTR;
2133         }
2134         spin_unlock(&cli->cl_loi_list_lock);
2135
2136         return rc;
2137 }
2138 EXPORT_SYMBOL(obd_get_request_slot);
2139
2140 void obd_put_request_slot(struct client_obd *cli)
2141 {
2142         struct obd_request_slot_waiter *orsw;
2143
2144         spin_lock(&cli->cl_loi_list_lock);
2145         cli->cl_rpcs_in_flight--;
2146
2147         /* If there is free slot, wakeup the first waiter. */
2148         if (!list_empty(&cli->cl_flight_waiters) &&
2149             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2150                 orsw = list_entry(cli->cl_flight_waiters.next,
2151                                   struct obd_request_slot_waiter, orsw_entry);
2152                 list_del_init(&orsw->orsw_entry);
2153                 cli->cl_rpcs_in_flight++;
2154                 wake_up(&orsw->orsw_waitq);
2155         }
2156         spin_unlock(&cli->cl_loi_list_lock);
2157 }
2158 EXPORT_SYMBOL(obd_put_request_slot);
2159
2160 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2161 {
2162         return cli->cl_max_rpcs_in_flight;
2163 }
2164 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2165
2166 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2167 {
2168         struct obd_request_slot_waiter *orsw;
2169         __u32                           old;
2170         int                             diff;
2171         int                             i;
2172         char                            *typ_name;
2173         int                             rc;
2174
2175         if (max > OBD_MAX_RIF_MAX || max < 1)
2176                 return -ERANGE;
2177
2178         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2179         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2180                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2181                  * strictly lower that max_rpcs_in_flight */
2182                 if (max < 2) {
2183                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2184                                "because it must be higher than "
2185                                "max_mod_rpcs_in_flight value",
2186                                cli->cl_import->imp_obd->obd_name);
2187                         return -ERANGE;
2188                 }
2189                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2190                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2191                         if (rc != 0)
2192                                 return rc;
2193                 }
2194         }
2195
2196         spin_lock(&cli->cl_loi_list_lock);
2197         old = cli->cl_max_rpcs_in_flight;
2198         cli->cl_max_rpcs_in_flight = max;
2199         client_adjust_max_dirty(cli);
2200
2201         diff = max - old;
2202
2203         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2204         for (i = 0; i < diff; i++) {
2205                 if (list_empty(&cli->cl_flight_waiters))
2206                         break;
2207
2208                 orsw = list_entry(cli->cl_flight_waiters.next,
2209                                   struct obd_request_slot_waiter, orsw_entry);
2210                 list_del_init(&orsw->orsw_entry);
2211                 cli->cl_rpcs_in_flight++;
2212                 wake_up(&orsw->orsw_waitq);
2213         }
2214         spin_unlock(&cli->cl_loi_list_lock);
2215
2216         return 0;
2217 }
2218 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2219
2220 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2221 {
2222         return cli->cl_max_mod_rpcs_in_flight;
2223 }
2224 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2225
2226 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2227 {
2228         struct obd_connect_data *ocd;
2229         __u16 maxmodrpcs;
2230         __u16 prev;
2231
2232         if (max > OBD_MAX_RIF_MAX || max < 1)
2233                 return -ERANGE;
2234
2235         /* cannot exceed or equal max_rpcs_in_flight */
2236         if (max >= cli->cl_max_rpcs_in_flight) {
2237                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2238                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2239                        cli->cl_import->imp_obd->obd_name,
2240                        max, cli->cl_max_rpcs_in_flight);
2241                 return -ERANGE;
2242         }
2243
2244         /* cannot exceed max modify RPCs in flight supported by the server */
2245         ocd = &cli->cl_import->imp_connect_data;
2246         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2247                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2248         else
2249                 maxmodrpcs = 1;
2250         if (max > maxmodrpcs) {
2251                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2252                        "higher than max_mod_rpcs_per_client value (%hu) "
2253                        "returned by the server at connection\n",
2254                        cli->cl_import->imp_obd->obd_name,
2255                        max, maxmodrpcs);
2256                 return -ERANGE;
2257         }
2258
2259         spin_lock(&cli->cl_mod_rpcs_lock);
2260
2261         prev = cli->cl_max_mod_rpcs_in_flight;
2262         cli->cl_max_mod_rpcs_in_flight = max;
2263
2264         /* wakeup waiters if limit has been increased */
2265         if (cli->cl_max_mod_rpcs_in_flight > prev)
2266                 wake_up(&cli->cl_mod_rpcs_waitq);
2267
2268         spin_unlock(&cli->cl_mod_rpcs_lock);
2269
2270         return 0;
2271 }
2272 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2273
2274
2275 #define pct(a, b) (b ? a * 100 / b : 0)
2276 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2277                                struct seq_file *seq)
2278 {
2279         unsigned long mod_tot = 0, mod_cum;
2280         struct timespec64 now;
2281         int i;
2282
2283         ktime_get_real_ts64(&now);
2284
2285         spin_lock(&cli->cl_mod_rpcs_lock);
2286
2287         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2288                    (s64)now.tv_sec, now.tv_nsec);
2289         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2290                    cli->cl_mod_rpcs_in_flight);
2291
2292         seq_printf(seq, "\n\t\t\tmodify\n");
2293         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2294
2295         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2296
2297         mod_cum = 0;
2298         for (i = 0; i < OBD_HIST_MAX; i++) {
2299                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2300                 mod_cum += mod;
2301                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2302                            i, mod, pct(mod, mod_tot),
2303                            pct(mod_cum, mod_tot));
2304                 if (mod_cum == mod_tot)
2305                         break;
2306         }
2307
2308         spin_unlock(&cli->cl_mod_rpcs_lock);
2309
2310         return 0;
2311 }
2312 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2313 #undef pct
2314
2315
2316 /* The number of modify RPCs sent in parallel is limited
2317  * because the server has a finite number of slots per client to
2318  * store request result and ensure reply reconstruction when needed.
2319  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2320  * that takes into account server limit and cl_max_rpcs_in_flight
2321  * value.
2322  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2323  * one close request is allowed above the maximum.
2324  */
2325 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2326                                                  bool close_req)
2327 {
2328         bool avail;
2329
2330         /* A slot is available if
2331          * - number of modify RPCs in flight is less than the max
2332          * - it's a close RPC and no other close request is in flight
2333          */
2334         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2335                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2336
2337         return avail;
2338 }
2339
2340 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2341                                          bool close_req)
2342 {
2343         bool avail;
2344
2345         spin_lock(&cli->cl_mod_rpcs_lock);
2346         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2347         spin_unlock(&cli->cl_mod_rpcs_lock);
2348         return avail;
2349 }
2350
2351 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2352 {
2353         if (it != NULL &&
2354             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2355              it->it_op == IT_READDIR ||
2356              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2357                         return true;
2358         return false;
2359 }
2360
2361 /* Get a modify RPC slot from the obd client @cli according
2362  * to the kind of operation @opc that is going to be sent
2363  * and the intent @it of the operation if it applies.
2364  * If the maximum number of modify RPCs in flight is reached
2365  * the thread is put to sleep.
2366  * Returns the tag to be set in the request message. Tag 0
2367  * is reserved for non-modifying requests.
2368  */
2369 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2370                            struct lookup_intent *it)
2371 {
2372         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2373         bool                    close_req = false;
2374         __u16                   i, max;
2375
2376         /* read-only metadata RPCs don't consume a slot on MDT
2377          * for reply reconstruction
2378          */
2379         if (obd_skip_mod_rpc_slot(it))
2380                 return 0;
2381
2382         if (opc == MDS_CLOSE)
2383                 close_req = true;
2384
2385         do {
2386                 spin_lock(&cli->cl_mod_rpcs_lock);
2387                 max = cli->cl_max_mod_rpcs_in_flight;
2388                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2389                         /* there is a slot available */
2390                         cli->cl_mod_rpcs_in_flight++;
2391                         if (close_req)
2392                                 cli->cl_close_rpcs_in_flight++;
2393                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2394                                          cli->cl_mod_rpcs_in_flight);
2395                         /* find a free tag */
2396                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2397                                                 max + 1);
2398                         LASSERT(i < OBD_MAX_RIF_MAX);
2399                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2400                         spin_unlock(&cli->cl_mod_rpcs_lock);
2401                         /* tag 0 is reserved for non-modify RPCs */
2402                         return i + 1;
2403                 }
2404                 spin_unlock(&cli->cl_mod_rpcs_lock);
2405
2406                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2407                        "opc %u, max %hu\n",
2408                        cli->cl_import->imp_obd->obd_name, opc, max);
2409
2410                 l_wait_event(cli->cl_mod_rpcs_waitq,
2411                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2412         } while (true);
2413 }
2414 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2415
2416 /* Put a modify RPC slot from the obd client @cli according
2417  * to the kind of operation @opc that has been sent and the
2418  * intent @it of the operation if it applies.
2419  */
2420 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2421                           struct lookup_intent *it, __u16 tag)
2422 {
2423         bool                    close_req = false;
2424
2425         if (obd_skip_mod_rpc_slot(it))
2426                 return;
2427
2428         if (opc == MDS_CLOSE)
2429                 close_req = true;
2430
2431         spin_lock(&cli->cl_mod_rpcs_lock);
2432         cli->cl_mod_rpcs_in_flight--;
2433         if (close_req)
2434                 cli->cl_close_rpcs_in_flight--;
2435         /* release the tag in the bitmap */
2436         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2437         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2438         spin_unlock(&cli->cl_mod_rpcs_lock);
2439         wake_up(&cli->cl_mod_rpcs_waitq);
2440 }
2441 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2442