Whamcloud - gitweb
LU-8066 obd: cleanup server sysfs symlinks handling
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         OBD_FREE(type, sizeof(*type));
166 }
167
168 static struct kobj_type class_ktype = {
169         .sysfs_ops      = &lustre_sysfs_ops,
170         .release        = class_sysfs_release,
171 };
172
173 #ifdef HAVE_SERVER_SUPPORT
174 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
175 {
176         struct dentry *symlink;
177         struct obd_type *type;
178         struct kobject *kobj;
179         int rc;
180
181         kobj = kset_find_obj(lustre_kset, name);
182         if (kobj) {
183                 kobject_put(kobj);
184                 return ERR_PTR(-EEXIST);
185         }
186
187         OBD_ALLOC(type, sizeof(*type));
188         if (!type)
189                 return ERR_PTR(-ENOMEM);
190
191         type->typ_kobj.kset = lustre_kset;
192         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
193                                   &lustre_kset->kobj, "%s", name);
194         if (rc)
195                 return ERR_PTR(rc);
196
197         symlink = debugfs_create_dir(name, debugfs_lustre_root);
198         if (IS_ERR_OR_NULL(symlink)) {
199                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
200                 kobject_put(&type->typ_kobj);
201                 return ERR_PTR(rc);
202         }
203         type->typ_debugfs_entry = symlink;
204         type->typ_sym_filter = true;
205
206         if (enable_proc) {
207                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
208                                                       NULL, NULL);
209                 if (IS_ERR(type->typ_procroot)) {
210                         CERROR("%s: can't create compat proc entry: %d\n",
211                                name, (int)PTR_ERR(type->typ_procroot));
212                         type->typ_procroot = NULL;
213                 }
214         }
215
216         return type;
217 }
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
220
221 #define CLASS_MAX_NAME 1024
222
223 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
224                         bool enable_proc, struct lprocfs_vars *vars,
225                         const char *name, struct lu_device_type *ldt)
226 {
227         struct obd_type *type;
228 #ifdef HAVE_SERVER_SUPPORT
229         struct kobject *kobj;
230 #endif /* HAVE_SERVER_SUPPORT */
231         int rc;
232
233         ENTRY;
234         /* sanity check */
235         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236
237         if (class_search_type(name)) {
238                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
239                 RETURN(-EEXIST);
240         }
241
242 #ifdef HAVE_SERVER_SUPPORT
243         kobj = kset_find_obj(lustre_kset, name);
244         if (kobj) {
245                 type = container_of(kobj, struct obd_type, typ_kobj);
246
247                 goto dir_exist;
248         }
249 #endif /* HAVE_SERVER_SUPPORT */
250
251         OBD_ALLOC(type, sizeof(*type));
252         if (type == NULL)
253                 RETURN(-ENOMEM);
254
255         type->typ_kobj.kset = lustre_kset;
256         kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
258 dir_exist:
259 #endif /* HAVE_SERVER_SUPPORT */
260         OBD_ALLOC_PTR(type->typ_dt_ops);
261         OBD_ALLOC_PTR(type->typ_md_ops);
262         OBD_ALLOC(type->typ_name, strlen(name) + 1);
263
264         if (type->typ_dt_ops == NULL ||
265             type->typ_md_ops == NULL ||
266             type->typ_name == NULL)
267                 GOTO (failed, rc = -ENOMEM);
268
269         *(type->typ_dt_ops) = *dt_ops;
270         /* md_ops is optional */
271         if (md_ops)
272                 *(type->typ_md_ops) = *md_ops;
273         strcpy(type->typ_name, name);
274         spin_lock_init(&type->obd_type_lock);
275
276 #ifdef HAVE_SERVER_SUPPORT
277         if (type->typ_sym_filter)
278                 goto setup_ldt;
279 #endif
280 #ifdef CONFIG_PROC_FS
281         if (enable_proc && !type->typ_procroot) {
282                 type->typ_procroot = lprocfs_register(type->typ_name,
283                                                       proc_lustre_root,
284                                                       NULL, type);
285                 if (IS_ERR(type->typ_procroot)) {
286                         rc = PTR_ERR(type->typ_procroot);
287                         type->typ_procroot = NULL;
288                         GOTO(failed, rc);
289                 }
290         }
291 #endif
292         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
293                                                     vars, type);
294         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
295                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
296                                              : -ENOMEM;
297                 type->typ_debugfs_entry = NULL;
298                 GOTO(failed, rc);
299         }
300
301         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
302         if (rc)
303                 GOTO(failed, rc);
304 #ifdef HAVE_SERVER_SUPPORT
305 setup_ldt:
306 #endif
307         if (ldt) {
308                 type->typ_lu = ldt;
309                 rc = lu_device_type_init(ldt);
310                 if (rc)
311                         GOTO(failed, rc);
312         }
313
314         spin_lock(&obd_types_lock);
315         list_add(&type->typ_chain, &obd_types);
316         spin_unlock(&obd_types_lock);
317
318         RETURN(0);
319
320 failed:
321 #ifdef HAVE_SERVER_SUPPORT
322         if (type->typ_sym_filter)
323                 type->typ_debugfs_entry = NULL;
324 #endif
325         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
326                 ldebugfs_remove(&type->typ_debugfs_entry);
327         if (type->typ_name != NULL) {
328 #ifdef CONFIG_PROC_FS
329                 if (type->typ_procroot != NULL)
330                         remove_proc_subtree(type->typ_name, proc_lustre_root);
331 #endif
332                 OBD_FREE(type->typ_name, strlen(name) + 1);
333         }
334         if (type->typ_md_ops != NULL)
335                 OBD_FREE_PTR(type->typ_md_ops);
336         if (type->typ_dt_ops != NULL)
337                 OBD_FREE_PTR(type->typ_dt_ops);
338         kobject_put(&type->typ_kobj);
339
340         RETURN(rc);
341 }
342 EXPORT_SYMBOL(class_register_type);
343
344 int class_unregister_type(const char *name)
345 {
346         struct obd_type *type = class_search_type(name);
347         ENTRY;
348
349         if (!type) {
350                 CERROR("unknown obd type\n");
351                 RETURN(-EINVAL);
352         }
353
354         if (type->typ_refcnt) {
355                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
356                 /* This is a bad situation, let's make the best of it */
357                 /* Remove ops, but leave the name for debugging */
358                 OBD_FREE_PTR(type->typ_dt_ops);
359                 OBD_FREE_PTR(type->typ_md_ops);
360                 RETURN(-EBUSY);
361         }
362
363         /* we do not use type->typ_procroot as for compatibility purposes
364          * other modules can share names (i.e. lod can use lov entry). so
365          * we can't reference pointer as it can get invalided when another
366          * module removes the entry */
367 #ifdef CONFIG_PROC_FS
368         if (type->typ_procroot != NULL)
369                 remove_proc_subtree(type->typ_name, proc_lustre_root);
370 #endif
371 #ifdef HAVE_SERVER_SUPPORT
372         if (type->typ_sym_filter)
373                 type->typ_debugfs_entry = NULL;
374 #endif
375         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
376                 ldebugfs_remove(&type->typ_debugfs_entry);
377
378         if (type->typ_lu)
379                 lu_device_type_fini(type->typ_lu);
380
381         spin_lock(&obd_types_lock);
382         list_del(&type->typ_chain);
383         spin_unlock(&obd_types_lock);
384         OBD_FREE(type->typ_name, strlen(name) + 1);
385         if (type->typ_dt_ops != NULL)
386                 OBD_FREE_PTR(type->typ_dt_ops);
387         if (type->typ_md_ops != NULL)
388                 OBD_FREE_PTR(type->typ_md_ops);
389         kobject_put(&type->typ_kobj);
390
391         RETURN(0);
392 } /* class_unregister_type */
393 EXPORT_SYMBOL(class_unregister_type);
394
395 /**
396  * Create a new obd device.
397  *
398  * Allocate the new obd_device and initialize it.
399  *
400  * \param[in] type_name obd device type string.
401  * \param[in] name      obd device name.
402  * \param[in] uuid      obd device UUID
403  *
404  * \retval newdev         pointer to created obd_device
405  * \retval ERR_PTR(errno) on error
406  */
407 struct obd_device *class_newdev(const char *type_name, const char *name,
408                                 const char *uuid)
409 {
410         struct obd_device *newdev;
411         struct obd_type *type = NULL;
412         ENTRY;
413
414         if (strlen(name) >= MAX_OBD_NAME) {
415                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
416                 RETURN(ERR_PTR(-EINVAL));
417         }
418
419         type = class_get_type(type_name);
420         if (type == NULL){
421                 CERROR("OBD: unknown type: %s\n", type_name);
422                 RETURN(ERR_PTR(-ENODEV));
423         }
424
425         newdev = obd_device_alloc();
426         if (newdev == NULL) {
427                 class_put_type(type);
428                 RETURN(ERR_PTR(-ENOMEM));
429         }
430         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
431         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
432         newdev->obd_type = type;
433         newdev->obd_minor = -1;
434
435         rwlock_init(&newdev->obd_pool_lock);
436         newdev->obd_pool_limit = 0;
437         newdev->obd_pool_slv = 0;
438
439         INIT_LIST_HEAD(&newdev->obd_exports);
440         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
441         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
442         INIT_LIST_HEAD(&newdev->obd_exports_timed);
443         INIT_LIST_HEAD(&newdev->obd_nid_stats);
444         spin_lock_init(&newdev->obd_nid_lock);
445         spin_lock_init(&newdev->obd_dev_lock);
446         mutex_init(&newdev->obd_dev_mutex);
447         spin_lock_init(&newdev->obd_osfs_lock);
448         /* newdev->obd_osfs_age must be set to a value in the distant
449          * past to guarantee a fresh statfs is fetched on mount. */
450         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
451
452         /* XXX belongs in setup not attach  */
453         init_rwsem(&newdev->obd_observer_link_sem);
454         /* recovery data */
455         spin_lock_init(&newdev->obd_recovery_task_lock);
456         init_waitqueue_head(&newdev->obd_next_transno_waitq);
457         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
458         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
459         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
460         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
461         INIT_LIST_HEAD(&newdev->obd_evict_list);
462         INIT_LIST_HEAD(&newdev->obd_lwp_list);
463
464         llog_group_init(&newdev->obd_olg);
465         /* Detach drops this */
466         atomic_set(&newdev->obd_refcount, 1);
467         lu_ref_init(&newdev->obd_reference);
468         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
469
470         newdev->obd_conn_inprogress = 0;
471
472         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
473
474         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
475                newdev->obd_name, newdev);
476
477         return newdev;
478 }
479
480 /**
481  * Free obd device.
482  *
483  * \param[in] obd obd_device to be freed
484  *
485  * \retval none
486  */
487 void class_free_dev(struct obd_device *obd)
488 {
489         struct obd_type *obd_type = obd->obd_type;
490
491         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
492                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
493         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
494                  "obd %p != obd_devs[%d] %p\n",
495                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
496         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
497                  "obd_refcount should be 0, not %d\n",
498                  atomic_read(&obd->obd_refcount));
499         LASSERT(obd_type != NULL);
500
501         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
502                obd->obd_name, obd->obd_type->typ_name);
503
504         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
505                          obd->obd_name, obd->obd_uuid.uuid);
506         if (obd->obd_stopping) {
507                 int err;
508
509                 /* If we're not stopping, we were never set up */
510                 err = obd_cleanup(obd);
511                 if (err)
512                         CERROR("Cleanup %s returned %d\n",
513                                 obd->obd_name, err);
514         }
515
516         obd_device_free(obd);
517
518         class_put_type(obd_type);
519 }
520
521 /**
522  * Unregister obd device.
523  *
524  * Free slot in obd_dev[] used by \a obd.
525  *
526  * \param[in] new_obd obd_device to be unregistered
527  *
528  * \retval none
529  */
530 void class_unregister_device(struct obd_device *obd)
531 {
532         write_lock(&obd_dev_lock);
533         if (obd->obd_minor >= 0) {
534                 LASSERT(obd_devs[obd->obd_minor] == obd);
535                 obd_devs[obd->obd_minor] = NULL;
536                 obd->obd_minor = -1;
537         }
538         write_unlock(&obd_dev_lock);
539 }
540
541 /**
542  * Register obd device.
543  *
544  * Find free slot in obd_devs[], fills it with \a new_obd.
545  *
546  * \param[in] new_obd obd_device to be registered
547  *
548  * \retval 0          success
549  * \retval -EEXIST    device with this name is registered
550  * \retval -EOVERFLOW obd_devs[] is full
551  */
552 int class_register_device(struct obd_device *new_obd)
553 {
554         int ret = 0;
555         int i;
556         int new_obd_minor = 0;
557         bool minor_assign = false;
558         bool retried = false;
559
560 again:
561         write_lock(&obd_dev_lock);
562         for (i = 0; i < class_devno_max(); i++) {
563                 struct obd_device *obd = class_num2obd(i);
564
565                 if (obd != NULL &&
566                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
567
568                         if (!retried) {
569                                 write_unlock(&obd_dev_lock);
570
571                                 /* the obd_device could be waited to be
572                                  * destroyed by the "obd_zombie_impexp_thread".
573                                  */
574                                 obd_zombie_barrier();
575                                 retried = true;
576                                 goto again;
577                         }
578
579                         CERROR("%s: already exists, won't add\n",
580                                obd->obd_name);
581                         /* in case we found a free slot before duplicate */
582                         minor_assign = false;
583                         ret = -EEXIST;
584                         break;
585                 }
586                 if (!minor_assign && obd == NULL) {
587                         new_obd_minor = i;
588                         minor_assign = true;
589                 }
590         }
591
592         if (minor_assign) {
593                 new_obd->obd_minor = new_obd_minor;
594                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
595                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
596                 obd_devs[new_obd_minor] = new_obd;
597         } else {
598                 if (ret == 0) {
599                         ret = -EOVERFLOW;
600                         CERROR("%s: all %u/%u devices used, increase "
601                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
602                                i, class_devno_max(), ret);
603                 }
604         }
605         write_unlock(&obd_dev_lock);
606
607         RETURN(ret);
608 }
609
610 static int class_name2dev_nolock(const char *name)
611 {
612         int i;
613
614         if (!name)
615                 return -1;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && strcmp(name, obd->obd_name) == 0) {
621                         /* Make sure we finished attaching before we give
622                            out any references */
623                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
624                         if (obd->obd_attached) {
625                                 return i;
626                         }
627                         break;
628                 }
629         }
630
631         return -1;
632 }
633
634 int class_name2dev(const char *name)
635 {
636         int i;
637
638         if (!name)
639                 return -1;
640
641         read_lock(&obd_dev_lock);
642         i = class_name2dev_nolock(name);
643         read_unlock(&obd_dev_lock);
644
645         return i;
646 }
647 EXPORT_SYMBOL(class_name2dev);
648
649 struct obd_device *class_name2obd(const char *name)
650 {
651         int dev = class_name2dev(name);
652
653         if (dev < 0 || dev > class_devno_max())
654                 return NULL;
655         return class_num2obd(dev);
656 }
657 EXPORT_SYMBOL(class_name2obd);
658
659 int class_uuid2dev_nolock(struct obd_uuid *uuid)
660 {
661         int i;
662
663         for (i = 0; i < class_devno_max(); i++) {
664                 struct obd_device *obd = class_num2obd(i);
665
666                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
667                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
668                         return i;
669                 }
670         }
671
672         return -1;
673 }
674
675 int class_uuid2dev(struct obd_uuid *uuid)
676 {
677         int i;
678
679         read_lock(&obd_dev_lock);
680         i = class_uuid2dev_nolock(uuid);
681         read_unlock(&obd_dev_lock);
682
683         return i;
684 }
685 EXPORT_SYMBOL(class_uuid2dev);
686
687 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
688 {
689         int dev = class_uuid2dev(uuid);
690         if (dev < 0)
691                 return NULL;
692         return class_num2obd(dev);
693 }
694 EXPORT_SYMBOL(class_uuid2obd);
695
696 /**
697  * Get obd device from ::obd_devs[]
698  *
699  * \param num [in] array index
700  *
701  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
702  *         otherwise return the obd device there.
703  */
704 struct obd_device *class_num2obd(int num)
705 {
706         struct obd_device *obd = NULL;
707
708         if (num < class_devno_max()) {
709                 obd = obd_devs[num];
710                 if (obd == NULL)
711                         return NULL;
712
713                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
714                          "%p obd_magic %08x != %08x\n",
715                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
716                 LASSERTF(obd->obd_minor == num,
717                          "%p obd_minor %0d != %0d\n",
718                          obd, obd->obd_minor, num);
719         }
720
721         return obd;
722 }
723
724 /**
725  * Find obd in obd_dev[] by name or uuid.
726  *
727  * Increment obd's refcount if found.
728  *
729  * \param[in] str obd name or uuid
730  *
731  * \retval NULL    if not found
732  * \retval target  pointer to found obd_device
733  */
734 struct obd_device *class_dev_by_str(const char *str)
735 {
736         struct obd_device *target = NULL;
737         struct obd_uuid tgtuuid;
738         int rc;
739
740         obd_str2uuid(&tgtuuid, str);
741
742         read_lock(&obd_dev_lock);
743         rc = class_uuid2dev_nolock(&tgtuuid);
744         if (rc < 0)
745                 rc = class_name2dev_nolock(str);
746
747         if (rc >= 0)
748                 target = class_num2obd(rc);
749
750         if (target != NULL)
751                 class_incref(target, "find", current);
752         read_unlock(&obd_dev_lock);
753
754         RETURN(target);
755 }
756 EXPORT_SYMBOL(class_dev_by_str);
757
758 /**
759  * Get obd devices count. Device in any
760  *    state are counted
761  * \retval obd device count
762  */
763 int get_devices_count(void)
764 {
765         int index, max_index = class_devno_max(), dev_count = 0;
766
767         read_lock(&obd_dev_lock);
768         for (index = 0; index <= max_index; index++) {
769                 struct obd_device *obd = class_num2obd(index);
770                 if (obd != NULL)
771                         dev_count++;
772         }
773         read_unlock(&obd_dev_lock);
774
775         return dev_count;
776 }
777 EXPORT_SYMBOL(get_devices_count);
778
779 void class_obd_list(void)
780 {
781         char *status;
782         int i;
783
784         read_lock(&obd_dev_lock);
785         for (i = 0; i < class_devno_max(); i++) {
786                 struct obd_device *obd = class_num2obd(i);
787
788                 if (obd == NULL)
789                         continue;
790                 if (obd->obd_stopping)
791                         status = "ST";
792                 else if (obd->obd_set_up)
793                         status = "UP";
794                 else if (obd->obd_attached)
795                         status = "AT";
796                 else
797                         status = "--";
798                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
799                          i, status, obd->obd_type->typ_name,
800                          obd->obd_name, obd->obd_uuid.uuid,
801                          atomic_read(&obd->obd_refcount));
802         }
803         read_unlock(&obd_dev_lock);
804         return;
805 }
806
807 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
808    specified, then only the client with that uuid is returned,
809    otherwise any client connected to the tgt is returned. */
810 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
811                                           const char * typ_name,
812                                           struct obd_uuid *grp_uuid)
813 {
814         int i;
815
816         read_lock(&obd_dev_lock);
817         for (i = 0; i < class_devno_max(); i++) {
818                 struct obd_device *obd = class_num2obd(i);
819
820                 if (obd == NULL)
821                         continue;
822                 if ((strncmp(obd->obd_type->typ_name, typ_name,
823                              strlen(typ_name)) == 0)) {
824                         if (obd_uuid_equals(tgt_uuid,
825                                             &obd->u.cli.cl_target_uuid) &&
826                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
827                                                          &obd->obd_uuid) : 1)) {
828                                 read_unlock(&obd_dev_lock);
829                                 return obd;
830                         }
831                 }
832         }
833         read_unlock(&obd_dev_lock);
834
835         return NULL;
836 }
837 EXPORT_SYMBOL(class_find_client_obd);
838
839 /* Iterate the obd_device list looking devices have grp_uuid. Start
840    searching at *next, and if a device is found, the next index to look
841    at is saved in *next. If next is NULL, then the first matching device
842    will always be returned. */
843 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
844 {
845         int i;
846
847         if (next == NULL)
848                 i = 0;
849         else if (*next >= 0 && *next < class_devno_max())
850                 i = *next;
851         else
852                 return NULL;
853
854         read_lock(&obd_dev_lock);
855         for (; i < class_devno_max(); i++) {
856                 struct obd_device *obd = class_num2obd(i);
857
858                 if (obd == NULL)
859                         continue;
860                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
861                         if (next != NULL)
862                                 *next = i+1;
863                         read_unlock(&obd_dev_lock);
864                         return obd;
865                 }
866         }
867         read_unlock(&obd_dev_lock);
868
869         return NULL;
870 }
871 EXPORT_SYMBOL(class_devices_in_group);
872
873 /**
874  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
875  * adjust sptlrpc settings accordingly.
876  */
877 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
878 {
879         struct obd_device  *obd;
880         const char         *type;
881         int                 i, rc = 0, rc2;
882
883         LASSERT(namelen > 0);
884
885         read_lock(&obd_dev_lock);
886         for (i = 0; i < class_devno_max(); i++) {
887                 obd = class_num2obd(i);
888
889                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
890                         continue;
891
892                 /* only notify mdc, osc, osp, lwp, mdt, ost
893                  * because only these have a -sptlrpc llog */
894                 type = obd->obd_type->typ_name;
895                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
896                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
897                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
898                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
899                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
900                     strcmp(type, LUSTRE_OST_NAME) != 0)
901                         continue;
902
903                 if (strncmp(obd->obd_name, fsname, namelen))
904                         continue;
905
906                 class_incref(obd, __FUNCTION__, obd);
907                 read_unlock(&obd_dev_lock);
908                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
909                                          sizeof(KEY_SPTLRPC_CONF),
910                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
911                 rc = rc ? rc : rc2;
912                 class_decref(obd, __FUNCTION__, obd);
913                 read_lock(&obd_dev_lock);
914         }
915         read_unlock(&obd_dev_lock);
916         return rc;
917 }
918 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
919
920 void obd_cleanup_caches(void)
921 {
922         ENTRY;
923         if (obd_device_cachep) {
924                 kmem_cache_destroy(obd_device_cachep);
925                 obd_device_cachep = NULL;
926         }
927
928         EXIT;
929 }
930
931 int obd_init_caches(void)
932 {
933         int rc;
934         ENTRY;
935
936         LASSERT(obd_device_cachep == NULL);
937         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
938                                               sizeof(struct obd_device),
939                                               0, 0, NULL);
940         if (!obd_device_cachep)
941                 GOTO(out, rc = -ENOMEM);
942
943         RETURN(0);
944 out:
945         obd_cleanup_caches();
946         RETURN(rc);
947 }
948
949 /* map connection to client */
950 struct obd_export *class_conn2export(struct lustre_handle *conn)
951 {
952         struct obd_export *export;
953         ENTRY;
954
955         if (!conn) {
956                 CDEBUG(D_CACHE, "looking for null handle\n");
957                 RETURN(NULL);
958         }
959
960         if (conn->cookie == -1) {  /* this means assign a new connection */
961                 CDEBUG(D_CACHE, "want a new connection\n");
962                 RETURN(NULL);
963         }
964
965         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
966         export = class_handle2object(conn->cookie, NULL);
967         RETURN(export);
968 }
969 EXPORT_SYMBOL(class_conn2export);
970
971 struct obd_device *class_exp2obd(struct obd_export *exp)
972 {
973         if (exp)
974                 return exp->exp_obd;
975         return NULL;
976 }
977 EXPORT_SYMBOL(class_exp2obd);
978
979 struct obd_import *class_exp2cliimp(struct obd_export *exp)
980 {
981         struct obd_device *obd = exp->exp_obd;
982         if (obd == NULL)
983                 return NULL;
984         return obd->u.cli.cl_import;
985 }
986 EXPORT_SYMBOL(class_exp2cliimp);
987
988 /* Export management functions */
989 static void class_export_destroy(struct obd_export *exp)
990 {
991         struct obd_device *obd = exp->exp_obd;
992         ENTRY;
993
994         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
995         LASSERT(obd != NULL);
996
997         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
998                exp->exp_client_uuid.uuid, obd->obd_name);
999
1000         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1001         if (exp->exp_connection)
1002                 ptlrpc_put_connection_superhack(exp->exp_connection);
1003
1004         LASSERT(list_empty(&exp->exp_outstanding_replies));
1005         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1006         LASSERT(list_empty(&exp->exp_req_replay_queue));
1007         LASSERT(list_empty(&exp->exp_hp_rpcs));
1008         obd_destroy_export(exp);
1009         /* self export doesn't hold a reference to an obd, although it
1010          * exists until freeing of the obd */
1011         if (exp != obd->obd_self_export)
1012                 class_decref(obd, "export", exp);
1013
1014         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1015         EXIT;
1016 }
1017
1018 static void export_handle_addref(void *export)
1019 {
1020         class_export_get(export);
1021 }
1022
1023 static struct portals_handle_ops export_handle_ops = {
1024         .hop_addref = export_handle_addref,
1025         .hop_free   = NULL,
1026 };
1027
1028 struct obd_export *class_export_get(struct obd_export *exp)
1029 {
1030         atomic_inc(&exp->exp_refcount);
1031         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1032                atomic_read(&exp->exp_refcount));
1033         return exp;
1034 }
1035 EXPORT_SYMBOL(class_export_get);
1036
1037 void class_export_put(struct obd_export *exp)
1038 {
1039         LASSERT(exp != NULL);
1040         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1041         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1042                atomic_read(&exp->exp_refcount) - 1);
1043
1044         if (atomic_dec_and_test(&exp->exp_refcount)) {
1045                 struct obd_device *obd = exp->exp_obd;
1046
1047                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1048                        exp, exp->exp_client_uuid.uuid);
1049
1050                 /* release nid stat refererence */
1051                 lprocfs_exp_cleanup(exp);
1052
1053                 if (exp == obd->obd_self_export) {
1054                         /* self export should be destroyed without
1055                          * zombie thread as it doesn't hold a
1056                          * reference to obd and doesn't hold any
1057                          * resources */
1058                         class_export_destroy(exp);
1059                         /* self export is destroyed, no class
1060                          * references exist and it is safe to free
1061                          * obd */
1062                         class_free_dev(obd);
1063                 } else {
1064                         LASSERT(!list_empty(&exp->exp_obd_chain));
1065                         obd_zombie_export_add(exp);
1066                 }
1067
1068         }
1069 }
1070 EXPORT_SYMBOL(class_export_put);
1071
1072 static void obd_zombie_exp_cull(struct work_struct *ws)
1073 {
1074         struct obd_export *export;
1075
1076         export = container_of(ws, struct obd_export, exp_zombie_work);
1077         class_export_destroy(export);
1078 }
1079
1080 /* Creates a new export, adds it to the hash table, and returns a
1081  * pointer to it. The refcount is 2: one for the hash reference, and
1082  * one for the pointer returned by this function. */
1083 struct obd_export *__class_new_export(struct obd_device *obd,
1084                                       struct obd_uuid *cluuid, bool is_self)
1085 {
1086         struct obd_export *export;
1087         struct cfs_hash *hash = NULL;
1088         int rc = 0;
1089         ENTRY;
1090
1091         OBD_ALLOC_PTR(export);
1092         if (!export)
1093                 return ERR_PTR(-ENOMEM);
1094
1095         export->exp_conn_cnt = 0;
1096         export->exp_lock_hash = NULL;
1097         export->exp_flock_hash = NULL;
1098         /* 2 = class_handle_hash + last */
1099         atomic_set(&export->exp_refcount, 2);
1100         atomic_set(&export->exp_rpc_count, 0);
1101         atomic_set(&export->exp_cb_count, 0);
1102         atomic_set(&export->exp_locks_count, 0);
1103 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1104         INIT_LIST_HEAD(&export->exp_locks_list);
1105         spin_lock_init(&export->exp_locks_list_guard);
1106 #endif
1107         atomic_set(&export->exp_replay_count, 0);
1108         export->exp_obd = obd;
1109         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1110         spin_lock_init(&export->exp_uncommitted_replies_lock);
1111         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1112         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1113         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1114         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1115         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1116         class_handle_hash(&export->exp_handle, &export_handle_ops);
1117         export->exp_last_request_time = ktime_get_real_seconds();
1118         spin_lock_init(&export->exp_lock);
1119         spin_lock_init(&export->exp_rpc_lock);
1120         INIT_HLIST_NODE(&export->exp_uuid_hash);
1121         INIT_HLIST_NODE(&export->exp_nid_hash);
1122         INIT_HLIST_NODE(&export->exp_gen_hash);
1123         spin_lock_init(&export->exp_bl_list_lock);
1124         INIT_LIST_HEAD(&export->exp_bl_list);
1125         INIT_LIST_HEAD(&export->exp_stale_list);
1126         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1127
1128         export->exp_sp_peer = LUSTRE_SP_ANY;
1129         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1130         export->exp_client_uuid = *cluuid;
1131         obd_init_export(export);
1132
1133         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1134                 spin_lock(&obd->obd_dev_lock);
1135                 /* shouldn't happen, but might race */
1136                 if (obd->obd_stopping)
1137                         GOTO(exit_unlock, rc = -ENODEV);
1138
1139                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1140                 if (hash == NULL)
1141                         GOTO(exit_unlock, rc = -ENODEV);
1142                 spin_unlock(&obd->obd_dev_lock);
1143
1144                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1145                 if (rc != 0) {
1146                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1147                                       obd->obd_name, cluuid->uuid, rc);
1148                         GOTO(exit_err, rc = -EALREADY);
1149                 }
1150         }
1151
1152         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1153         spin_lock(&obd->obd_dev_lock);
1154         if (obd->obd_stopping) {
1155                 if (hash)
1156                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1157                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1158         }
1159
1160         if (!is_self) {
1161                 class_incref(obd, "export", export);
1162                 list_add_tail(&export->exp_obd_chain_timed,
1163                               &obd->obd_exports_timed);
1164                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1165                 obd->obd_num_exports++;
1166         } else {
1167                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1168                 INIT_LIST_HEAD(&export->exp_obd_chain);
1169         }
1170         spin_unlock(&obd->obd_dev_lock);
1171         if (hash)
1172                 cfs_hash_putref(hash);
1173         RETURN(export);
1174
1175 exit_unlock:
1176         spin_unlock(&obd->obd_dev_lock);
1177 exit_err:
1178         if (hash)
1179                 cfs_hash_putref(hash);
1180         class_handle_unhash(&export->exp_handle);
1181         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1182         obd_destroy_export(export);
1183         OBD_FREE_PTR(export);
1184         return ERR_PTR(rc);
1185 }
1186
1187 struct obd_export *class_new_export(struct obd_device *obd,
1188                                     struct obd_uuid *uuid)
1189 {
1190         return __class_new_export(obd, uuid, false);
1191 }
1192 EXPORT_SYMBOL(class_new_export);
1193
1194 struct obd_export *class_new_export_self(struct obd_device *obd,
1195                                          struct obd_uuid *uuid)
1196 {
1197         return __class_new_export(obd, uuid, true);
1198 }
1199
1200 void class_unlink_export(struct obd_export *exp)
1201 {
1202         class_handle_unhash(&exp->exp_handle);
1203
1204         if (exp->exp_obd->obd_self_export == exp) {
1205                 class_export_put(exp);
1206                 return;
1207         }
1208
1209         spin_lock(&exp->exp_obd->obd_dev_lock);
1210         /* delete an uuid-export hashitem from hashtables */
1211         if (!hlist_unhashed(&exp->exp_uuid_hash))
1212                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1213                              &exp->exp_client_uuid,
1214                              &exp->exp_uuid_hash);
1215
1216 #ifdef HAVE_SERVER_SUPPORT
1217         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1218                 struct tg_export_data   *ted = &exp->exp_target_data;
1219                 struct cfs_hash         *hash;
1220
1221                 /* Because obd_gen_hash will not be released until
1222                  * class_cleanup(), so hash should never be NULL here */
1223                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1224                 LASSERT(hash != NULL);
1225                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1226                              &exp->exp_gen_hash);
1227                 cfs_hash_putref(hash);
1228         }
1229 #endif /* HAVE_SERVER_SUPPORT */
1230
1231         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1232         list_del_init(&exp->exp_obd_chain_timed);
1233         exp->exp_obd->obd_num_exports--;
1234         spin_unlock(&exp->exp_obd->obd_dev_lock);
1235         atomic_inc(&obd_stale_export_num);
1236
1237         /* A reference is kept by obd_stale_exports list */
1238         obd_stale_export_put(exp);
1239 }
1240 EXPORT_SYMBOL(class_unlink_export);
1241
1242 /* Import management functions */
1243 static void obd_zombie_import_free(struct obd_import *imp)
1244 {
1245         ENTRY;
1246
1247         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1248                 imp->imp_obd->obd_name);
1249
1250         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1251
1252         ptlrpc_put_connection_superhack(imp->imp_connection);
1253
1254         while (!list_empty(&imp->imp_conn_list)) {
1255                 struct obd_import_conn *imp_conn;
1256
1257                 imp_conn = list_entry(imp->imp_conn_list.next,
1258                                       struct obd_import_conn, oic_item);
1259                 list_del_init(&imp_conn->oic_item);
1260                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1261                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1262         }
1263
1264         LASSERT(imp->imp_sec == NULL);
1265         class_decref(imp->imp_obd, "import", imp);
1266         OBD_FREE_PTR(imp);
1267         EXIT;
1268 }
1269
1270 struct obd_import *class_import_get(struct obd_import *import)
1271 {
1272         atomic_inc(&import->imp_refcount);
1273         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1274                atomic_read(&import->imp_refcount),
1275                import->imp_obd->obd_name);
1276         return import;
1277 }
1278 EXPORT_SYMBOL(class_import_get);
1279
1280 void class_import_put(struct obd_import *imp)
1281 {
1282         ENTRY;
1283
1284         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1285
1286         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1287                atomic_read(&imp->imp_refcount) - 1,
1288                imp->imp_obd->obd_name);
1289
1290         if (atomic_dec_and_test(&imp->imp_refcount)) {
1291                 CDEBUG(D_INFO, "final put import %p\n", imp);
1292                 obd_zombie_import_add(imp);
1293         }
1294
1295         /* catch possible import put race */
1296         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1297         EXIT;
1298 }
1299 EXPORT_SYMBOL(class_import_put);
1300
1301 static void init_imp_at(struct imp_at *at) {
1302         int i;
1303         at_init(&at->iat_net_latency, 0, 0);
1304         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1305                 /* max service estimates are tracked on the server side, so
1306                    don't use the AT history here, just use the last reported
1307                    val. (But keep hist for proc histogram, worst_ever) */
1308                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1309                         AT_FLG_NOHIST);
1310         }
1311 }
1312
1313 static void obd_zombie_imp_cull(struct work_struct *ws)
1314 {
1315         struct obd_import *import;
1316
1317         import = container_of(ws, struct obd_import, imp_zombie_work);
1318         obd_zombie_import_free(import);
1319 }
1320
1321 struct obd_import *class_new_import(struct obd_device *obd)
1322 {
1323         struct obd_import *imp;
1324         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1325
1326         OBD_ALLOC(imp, sizeof(*imp));
1327         if (imp == NULL)
1328                 return NULL;
1329
1330         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1331         INIT_LIST_HEAD(&imp->imp_replay_list);
1332         INIT_LIST_HEAD(&imp->imp_sending_list);
1333         INIT_LIST_HEAD(&imp->imp_delayed_list);
1334         INIT_LIST_HEAD(&imp->imp_committed_list);
1335         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1336         imp->imp_known_replied_xid = 0;
1337         imp->imp_replay_cursor = &imp->imp_committed_list;
1338         spin_lock_init(&imp->imp_lock);
1339         imp->imp_last_success_conn = 0;
1340         imp->imp_state = LUSTRE_IMP_NEW;
1341         imp->imp_obd = class_incref(obd, "import", imp);
1342         mutex_init(&imp->imp_sec_mutex);
1343         init_waitqueue_head(&imp->imp_recovery_waitq);
1344         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1345
1346         if (curr_pid_ns->child_reaper)
1347                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1348         else
1349                 imp->imp_sec_refpid = 1;
1350
1351         atomic_set(&imp->imp_refcount, 2);
1352         atomic_set(&imp->imp_unregistering, 0);
1353         atomic_set(&imp->imp_inflight, 0);
1354         atomic_set(&imp->imp_replay_inflight, 0);
1355         atomic_set(&imp->imp_inval_count, 0);
1356         INIT_LIST_HEAD(&imp->imp_conn_list);
1357         init_imp_at(&imp->imp_at);
1358
1359         /* the default magic is V2, will be used in connect RPC, and
1360          * then adjusted according to the flags in request/reply. */
1361         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1362
1363         return imp;
1364 }
1365 EXPORT_SYMBOL(class_new_import);
1366
1367 void class_destroy_import(struct obd_import *import)
1368 {
1369         LASSERT(import != NULL);
1370         LASSERT(import != LP_POISON);
1371
1372         spin_lock(&import->imp_lock);
1373         import->imp_generation++;
1374         spin_unlock(&import->imp_lock);
1375         class_import_put(import);
1376 }
1377 EXPORT_SYMBOL(class_destroy_import);
1378
1379 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1380
1381 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1382 {
1383         spin_lock(&exp->exp_locks_list_guard);
1384
1385         LASSERT(lock->l_exp_refs_nr >= 0);
1386
1387         if (lock->l_exp_refs_target != NULL &&
1388             lock->l_exp_refs_target != exp) {
1389                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1390                               exp, lock, lock->l_exp_refs_target);
1391         }
1392         if ((lock->l_exp_refs_nr ++) == 0) {
1393                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1394                 lock->l_exp_refs_target = exp;
1395         }
1396         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1397                lock, exp, lock->l_exp_refs_nr);
1398         spin_unlock(&exp->exp_locks_list_guard);
1399 }
1400 EXPORT_SYMBOL(__class_export_add_lock_ref);
1401
1402 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1403 {
1404         spin_lock(&exp->exp_locks_list_guard);
1405         LASSERT(lock->l_exp_refs_nr > 0);
1406         if (lock->l_exp_refs_target != exp) {
1407                 LCONSOLE_WARN("lock %p, "
1408                               "mismatching export pointers: %p, %p\n",
1409                               lock, lock->l_exp_refs_target, exp);
1410         }
1411         if (-- lock->l_exp_refs_nr == 0) {
1412                 list_del_init(&lock->l_exp_refs_link);
1413                 lock->l_exp_refs_target = NULL;
1414         }
1415         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1416                lock, exp, lock->l_exp_refs_nr);
1417         spin_unlock(&exp->exp_locks_list_guard);
1418 }
1419 EXPORT_SYMBOL(__class_export_del_lock_ref);
1420 #endif
1421
1422 /* A connection defines an export context in which preallocation can
1423    be managed. This releases the export pointer reference, and returns
1424    the export handle, so the export refcount is 1 when this function
1425    returns. */
1426 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1427                   struct obd_uuid *cluuid)
1428 {
1429         struct obd_export *export;
1430         LASSERT(conn != NULL);
1431         LASSERT(obd != NULL);
1432         LASSERT(cluuid != NULL);
1433         ENTRY;
1434
1435         export = class_new_export(obd, cluuid);
1436         if (IS_ERR(export))
1437                 RETURN(PTR_ERR(export));
1438
1439         conn->cookie = export->exp_handle.h_cookie;
1440         class_export_put(export);
1441
1442         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1443                cluuid->uuid, conn->cookie);
1444         RETURN(0);
1445 }
1446 EXPORT_SYMBOL(class_connect);
1447
1448 /* if export is involved in recovery then clean up related things */
1449 static void class_export_recovery_cleanup(struct obd_export *exp)
1450 {
1451         struct obd_device *obd = exp->exp_obd;
1452
1453         spin_lock(&obd->obd_recovery_task_lock);
1454         if (obd->obd_recovering) {
1455                 if (exp->exp_in_recovery) {
1456                         spin_lock(&exp->exp_lock);
1457                         exp->exp_in_recovery = 0;
1458                         spin_unlock(&exp->exp_lock);
1459                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1460                         atomic_dec(&obd->obd_connected_clients);
1461                 }
1462
1463                 /* if called during recovery then should update
1464                  * obd_stale_clients counter,
1465                  * lightweight exports are not counted */
1466                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1467                         exp->exp_obd->obd_stale_clients++;
1468         }
1469         spin_unlock(&obd->obd_recovery_task_lock);
1470
1471         spin_lock(&exp->exp_lock);
1472         /** Cleanup req replay fields */
1473         if (exp->exp_req_replay_needed) {
1474                 exp->exp_req_replay_needed = 0;
1475
1476                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1477                 atomic_dec(&obd->obd_req_replay_clients);
1478         }
1479
1480         /** Cleanup lock replay data */
1481         if (exp->exp_lock_replay_needed) {
1482                 exp->exp_lock_replay_needed = 0;
1483
1484                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1485                 atomic_dec(&obd->obd_lock_replay_clients);
1486         }
1487         spin_unlock(&exp->exp_lock);
1488 }
1489
1490 /* This function removes 1-3 references from the export:
1491  * 1 - for export pointer passed
1492  * and if disconnect really need
1493  * 2 - removing from hash
1494  * 3 - in client_unlink_export
1495  * The export pointer passed to this function can destroyed */
1496 int class_disconnect(struct obd_export *export)
1497 {
1498         int already_disconnected;
1499         ENTRY;
1500
1501         if (export == NULL) {
1502                 CWARN("attempting to free NULL export %p\n", export);
1503                 RETURN(-EINVAL);
1504         }
1505
1506         spin_lock(&export->exp_lock);
1507         already_disconnected = export->exp_disconnected;
1508         export->exp_disconnected = 1;
1509         /*  We hold references of export for uuid hash
1510          *  and nid_hash and export link at least. So
1511          *  it is safe to call cfs_hash_del in there.  */
1512         if (!hlist_unhashed(&export->exp_nid_hash))
1513                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1514                              &export->exp_connection->c_peer.nid,
1515                              &export->exp_nid_hash);
1516         spin_unlock(&export->exp_lock);
1517
1518         /* class_cleanup(), abort_recovery(), and class_fail_export()
1519          * all end up in here, and if any of them race we shouldn't
1520          * call extra class_export_puts(). */
1521         if (already_disconnected) {
1522                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1523                 GOTO(no_disconn, already_disconnected);
1524         }
1525
1526         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1527                export->exp_handle.h_cookie);
1528
1529         class_export_recovery_cleanup(export);
1530         class_unlink_export(export);
1531 no_disconn:
1532         class_export_put(export);
1533         RETURN(0);
1534 }
1535 EXPORT_SYMBOL(class_disconnect);
1536
1537 /* Return non-zero for a fully connected export */
1538 int class_connected_export(struct obd_export *exp)
1539 {
1540         int connected = 0;
1541
1542         if (exp) {
1543                 spin_lock(&exp->exp_lock);
1544                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1545                 spin_unlock(&exp->exp_lock);
1546         }
1547         return connected;
1548 }
1549 EXPORT_SYMBOL(class_connected_export);
1550
1551 static void class_disconnect_export_list(struct list_head *list,
1552                                          enum obd_option flags)
1553 {
1554         int rc;
1555         struct obd_export *exp;
1556         ENTRY;
1557
1558         /* It's possible that an export may disconnect itself, but
1559          * nothing else will be added to this list. */
1560         while (!list_empty(list)) {
1561                 exp = list_entry(list->next, struct obd_export,
1562                                  exp_obd_chain);
1563                 /* need for safe call CDEBUG after obd_disconnect */
1564                 class_export_get(exp);
1565
1566                 spin_lock(&exp->exp_lock);
1567                 exp->exp_flags = flags;
1568                 spin_unlock(&exp->exp_lock);
1569
1570                 if (obd_uuid_equals(&exp->exp_client_uuid,
1571                                     &exp->exp_obd->obd_uuid)) {
1572                         CDEBUG(D_HA,
1573                                "exp %p export uuid == obd uuid, don't discon\n",
1574                                exp);
1575                         /* Need to delete this now so we don't end up pointing
1576                          * to work_list later when this export is cleaned up. */
1577                         list_del_init(&exp->exp_obd_chain);
1578                         class_export_put(exp);
1579                         continue;
1580                 }
1581
1582                 class_export_get(exp);
1583                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1584                        "last request at %lld\n",
1585                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1586                        exp, exp->exp_last_request_time);
1587                 /* release one export reference anyway */
1588                 rc = obd_disconnect(exp);
1589
1590                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1591                        obd_export_nid2str(exp), exp, rc);
1592                 class_export_put(exp);
1593         }
1594         EXIT;
1595 }
1596
1597 void class_disconnect_exports(struct obd_device *obd)
1598 {
1599         struct list_head work_list;
1600         ENTRY;
1601
1602         /* Move all of the exports from obd_exports to a work list, en masse. */
1603         INIT_LIST_HEAD(&work_list);
1604         spin_lock(&obd->obd_dev_lock);
1605         list_splice_init(&obd->obd_exports, &work_list);
1606         list_splice_init(&obd->obd_delayed_exports, &work_list);
1607         spin_unlock(&obd->obd_dev_lock);
1608
1609         if (!list_empty(&work_list)) {
1610                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1611                        "disconnecting them\n", obd->obd_minor, obd);
1612                 class_disconnect_export_list(&work_list,
1613                                              exp_flags_from_obd(obd));
1614         } else
1615                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1616                        obd->obd_minor, obd);
1617         EXIT;
1618 }
1619 EXPORT_SYMBOL(class_disconnect_exports);
1620
1621 /* Remove exports that have not completed recovery.
1622  */
1623 void class_disconnect_stale_exports(struct obd_device *obd,
1624                                     int (*test_export)(struct obd_export *))
1625 {
1626         struct list_head work_list;
1627         struct obd_export *exp, *n;
1628         int evicted = 0;
1629         ENTRY;
1630
1631         INIT_LIST_HEAD(&work_list);
1632         spin_lock(&obd->obd_dev_lock);
1633         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1634                                  exp_obd_chain) {
1635                 /* don't count self-export as client */
1636                 if (obd_uuid_equals(&exp->exp_client_uuid,
1637                                     &exp->exp_obd->obd_uuid))
1638                         continue;
1639
1640                 /* don't evict clients which have no slot in last_rcvd
1641                  * (e.g. lightweight connection) */
1642                 if (exp->exp_target_data.ted_lr_idx == -1)
1643                         continue;
1644
1645                 spin_lock(&exp->exp_lock);
1646                 if (exp->exp_failed || test_export(exp)) {
1647                         spin_unlock(&exp->exp_lock);
1648                         continue;
1649                 }
1650                 exp->exp_failed = 1;
1651                 spin_unlock(&exp->exp_lock);
1652
1653                 list_move(&exp->exp_obd_chain, &work_list);
1654                 evicted++;
1655                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1656                        obd->obd_name, exp->exp_client_uuid.uuid,
1657                        obd_export_nid2str(exp));
1658                 print_export_data(exp, "EVICTING", 0, D_HA);
1659         }
1660         spin_unlock(&obd->obd_dev_lock);
1661
1662         if (evicted)
1663                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1664                               obd->obd_name, evicted);
1665
1666         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1667                                                  OBD_OPT_ABORT_RECOV);
1668         EXIT;
1669 }
1670 EXPORT_SYMBOL(class_disconnect_stale_exports);
1671
1672 void class_fail_export(struct obd_export *exp)
1673 {
1674         int rc, already_failed;
1675
1676         spin_lock(&exp->exp_lock);
1677         already_failed = exp->exp_failed;
1678         exp->exp_failed = 1;
1679         spin_unlock(&exp->exp_lock);
1680
1681         if (already_failed) {
1682                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1683                        exp, exp->exp_client_uuid.uuid);
1684                 return;
1685         }
1686
1687         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1688                exp, exp->exp_client_uuid.uuid);
1689
1690         if (obd_dump_on_timeout)
1691                 libcfs_debug_dumplog();
1692
1693         /* need for safe call CDEBUG after obd_disconnect */
1694         class_export_get(exp);
1695
1696         /* Most callers into obd_disconnect are removing their own reference
1697          * (request, for example) in addition to the one from the hash table.
1698          * We don't have such a reference here, so make one. */
1699         class_export_get(exp);
1700         rc = obd_disconnect(exp);
1701         if (rc)
1702                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1703         else
1704                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1705                        exp, exp->exp_client_uuid.uuid);
1706         class_export_put(exp);
1707 }
1708 EXPORT_SYMBOL(class_fail_export);
1709
1710 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1711 {
1712         struct cfs_hash *nid_hash;
1713         struct obd_export *doomed_exp = NULL;
1714         int exports_evicted = 0;
1715
1716         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1717
1718         spin_lock(&obd->obd_dev_lock);
1719         /* umount has run already, so evict thread should leave
1720          * its task to umount thread now */
1721         if (obd->obd_stopping) {
1722                 spin_unlock(&obd->obd_dev_lock);
1723                 return exports_evicted;
1724         }
1725         nid_hash = obd->obd_nid_hash;
1726         cfs_hash_getref(nid_hash);
1727         spin_unlock(&obd->obd_dev_lock);
1728
1729         do {
1730                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1731                 if (doomed_exp == NULL)
1732                         break;
1733
1734                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1735                          "nid %s found, wanted nid %s, requested nid %s\n",
1736                          obd_export_nid2str(doomed_exp),
1737                          libcfs_nid2str(nid_key), nid);
1738                 LASSERTF(doomed_exp != obd->obd_self_export,
1739                          "self-export is hashed by NID?\n");
1740                 exports_evicted++;
1741                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1742                               "request\n", obd->obd_name,
1743                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1744                               obd_export_nid2str(doomed_exp));
1745                 class_fail_export(doomed_exp);
1746                 class_export_put(doomed_exp);
1747         } while (1);
1748
1749         cfs_hash_putref(nid_hash);
1750
1751         if (!exports_evicted)
1752                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1753                        obd->obd_name, nid);
1754         return exports_evicted;
1755 }
1756 EXPORT_SYMBOL(obd_export_evict_by_nid);
1757
1758 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1759 {
1760         struct cfs_hash *uuid_hash;
1761         struct obd_export *doomed_exp = NULL;
1762         struct obd_uuid doomed_uuid;
1763         int exports_evicted = 0;
1764
1765         spin_lock(&obd->obd_dev_lock);
1766         if (obd->obd_stopping) {
1767                 spin_unlock(&obd->obd_dev_lock);
1768                 return exports_evicted;
1769         }
1770         uuid_hash = obd->obd_uuid_hash;
1771         cfs_hash_getref(uuid_hash);
1772         spin_unlock(&obd->obd_dev_lock);
1773
1774         obd_str2uuid(&doomed_uuid, uuid);
1775         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1776                 CERROR("%s: can't evict myself\n", obd->obd_name);
1777                 cfs_hash_putref(uuid_hash);
1778                 return exports_evicted;
1779         }
1780
1781         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1782
1783         if (doomed_exp == NULL) {
1784                 CERROR("%s: can't disconnect %s: no exports found\n",
1785                        obd->obd_name, uuid);
1786         } else {
1787                 CWARN("%s: evicting %s at adminstrative request\n",
1788                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1789                 class_fail_export(doomed_exp);
1790                 class_export_put(doomed_exp);
1791                 exports_evicted++;
1792         }
1793         cfs_hash_putref(uuid_hash);
1794
1795         return exports_evicted;
1796 }
1797
1798 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1799 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1800 EXPORT_SYMBOL(class_export_dump_hook);
1801 #endif
1802
1803 static void print_export_data(struct obd_export *exp, const char *status,
1804                               int locks, int debug_level)
1805 {
1806         struct ptlrpc_reply_state *rs;
1807         struct ptlrpc_reply_state *first_reply = NULL;
1808         int nreplies = 0;
1809
1810         spin_lock(&exp->exp_lock);
1811         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1812                             rs_exp_list) {
1813                 if (nreplies == 0)
1814                         first_reply = rs;
1815                 nreplies++;
1816         }
1817         spin_unlock(&exp->exp_lock);
1818
1819         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1820                "%p %s %llu stale:%d\n",
1821                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1822                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1823                atomic_read(&exp->exp_rpc_count),
1824                atomic_read(&exp->exp_cb_count),
1825                atomic_read(&exp->exp_locks_count),
1826                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1827                nreplies, first_reply, nreplies > 3 ? "..." : "",
1828                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1829 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1830         if (locks && class_export_dump_hook != NULL)
1831                 class_export_dump_hook(exp);
1832 #endif
1833 }
1834
1835 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1836 {
1837         struct obd_export *exp;
1838
1839         spin_lock(&obd->obd_dev_lock);
1840         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1841                 print_export_data(exp, "ACTIVE", locks, debug_level);
1842         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1843                 print_export_data(exp, "UNLINKED", locks, debug_level);
1844         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1845                 print_export_data(exp, "DELAYED", locks, debug_level);
1846         spin_unlock(&obd->obd_dev_lock);
1847 }
1848
1849 void obd_exports_barrier(struct obd_device *obd)
1850 {
1851         int waited = 2;
1852         LASSERT(list_empty(&obd->obd_exports));
1853         spin_lock(&obd->obd_dev_lock);
1854         while (!list_empty(&obd->obd_unlinked_exports)) {
1855                 spin_unlock(&obd->obd_dev_lock);
1856                 set_current_state(TASK_UNINTERRUPTIBLE);
1857                 schedule_timeout(cfs_time_seconds(waited));
1858                 if (waited > 5 && is_power_of_2(waited)) {
1859                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1860                                       "more than %d seconds. "
1861                                       "The obd refcount = %d. Is it stuck?\n",
1862                                       obd->obd_name, waited,
1863                                       atomic_read(&obd->obd_refcount));
1864                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1865                 }
1866                 waited *= 2;
1867                 spin_lock(&obd->obd_dev_lock);
1868         }
1869         spin_unlock(&obd->obd_dev_lock);
1870 }
1871 EXPORT_SYMBOL(obd_exports_barrier);
1872
1873 /**
1874  * Add export to the obd_zombe thread and notify it.
1875  */
1876 static void obd_zombie_export_add(struct obd_export *exp) {
1877         atomic_dec(&obd_stale_export_num);
1878         spin_lock(&exp->exp_obd->obd_dev_lock);
1879         LASSERT(!list_empty(&exp->exp_obd_chain));
1880         list_del_init(&exp->exp_obd_chain);
1881         spin_unlock(&exp->exp_obd->obd_dev_lock);
1882
1883         queue_work(zombie_wq, &exp->exp_zombie_work);
1884 }
1885
1886 /**
1887  * Add import to the obd_zombe thread and notify it.
1888  */
1889 static void obd_zombie_import_add(struct obd_import *imp) {
1890         LASSERT(imp->imp_sec == NULL);
1891
1892         queue_work(zombie_wq, &imp->imp_zombie_work);
1893 }
1894
1895 /**
1896  * wait when obd_zombie import/export queues become empty
1897  */
1898 void obd_zombie_barrier(void)
1899 {
1900         flush_workqueue(zombie_wq);
1901 }
1902 EXPORT_SYMBOL(obd_zombie_barrier);
1903
1904
1905 struct obd_export *obd_stale_export_get(void)
1906 {
1907         struct obd_export *exp = NULL;
1908         ENTRY;
1909
1910         spin_lock(&obd_stale_export_lock);
1911         if (!list_empty(&obd_stale_exports)) {
1912                 exp = list_entry(obd_stale_exports.next,
1913                                  struct obd_export, exp_stale_list);
1914                 list_del_init(&exp->exp_stale_list);
1915         }
1916         spin_unlock(&obd_stale_export_lock);
1917
1918         if (exp) {
1919                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1920                        atomic_read(&obd_stale_export_num));
1921         }
1922         RETURN(exp);
1923 }
1924 EXPORT_SYMBOL(obd_stale_export_get);
1925
1926 void obd_stale_export_put(struct obd_export *exp)
1927 {
1928         ENTRY;
1929
1930         LASSERT(list_empty(&exp->exp_stale_list));
1931         if (exp->exp_lock_hash &&
1932             atomic_read(&exp->exp_lock_hash->hs_count)) {
1933                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1934                        atomic_read(&obd_stale_export_num));
1935
1936                 spin_lock_bh(&exp->exp_bl_list_lock);
1937                 spin_lock(&obd_stale_export_lock);
1938                 /* Add to the tail if there is no blocked locks,
1939                  * to the head otherwise. */
1940                 if (list_empty(&exp->exp_bl_list))
1941                         list_add_tail(&exp->exp_stale_list,
1942                                       &obd_stale_exports);
1943                 else
1944                         list_add(&exp->exp_stale_list,
1945                                  &obd_stale_exports);
1946
1947                 spin_unlock(&obd_stale_export_lock);
1948                 spin_unlock_bh(&exp->exp_bl_list_lock);
1949         } else {
1950                 class_export_put(exp);
1951         }
1952         EXIT;
1953 }
1954 EXPORT_SYMBOL(obd_stale_export_put);
1955
1956 /**
1957  * Adjust the position of the export in the stale list,
1958  * i.e. move to the head of the list if is needed.
1959  **/
1960 void obd_stale_export_adjust(struct obd_export *exp)
1961 {
1962         LASSERT(exp != NULL);
1963         spin_lock_bh(&exp->exp_bl_list_lock);
1964         spin_lock(&obd_stale_export_lock);
1965
1966         if (!list_empty(&exp->exp_stale_list) &&
1967             !list_empty(&exp->exp_bl_list))
1968                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1969
1970         spin_unlock(&obd_stale_export_lock);
1971         spin_unlock_bh(&exp->exp_bl_list_lock);
1972 }
1973 EXPORT_SYMBOL(obd_stale_export_adjust);
1974
1975 /**
1976  * start destroy zombie import/export thread
1977  */
1978 int obd_zombie_impexp_init(void)
1979 {
1980         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1981         if (!zombie_wq)
1982                 return -ENOMEM;
1983
1984         return 0;
1985 }
1986
1987 /**
1988  * stop destroy zombie import/export thread
1989  */
1990 void obd_zombie_impexp_stop(void)
1991 {
1992         destroy_workqueue(zombie_wq);
1993         LASSERT(list_empty(&obd_stale_exports));
1994 }
1995
1996 /***** Kernel-userspace comm helpers *******/
1997
1998 /* Get length of entire message, including header */
1999 int kuc_len(int payload_len)
2000 {
2001         return sizeof(struct kuc_hdr) + payload_len;
2002 }
2003 EXPORT_SYMBOL(kuc_len);
2004
2005 /* Get a pointer to kuc header, given a ptr to the payload
2006  * @param p Pointer to payload area
2007  * @returns Pointer to kuc header
2008  */
2009 struct kuc_hdr * kuc_ptr(void *p)
2010 {
2011         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2012         LASSERT(lh->kuc_magic == KUC_MAGIC);
2013         return lh;
2014 }
2015 EXPORT_SYMBOL(kuc_ptr);
2016
2017 /* Alloc space for a message, and fill in header
2018  * @return Pointer to payload area
2019  */
2020 void *kuc_alloc(int payload_len, int transport, int type)
2021 {
2022         struct kuc_hdr *lh;
2023         int len = kuc_len(payload_len);
2024
2025         OBD_ALLOC(lh, len);
2026         if (lh == NULL)
2027                 return ERR_PTR(-ENOMEM);
2028
2029         lh->kuc_magic = KUC_MAGIC;
2030         lh->kuc_transport = transport;
2031         lh->kuc_msgtype = type;
2032         lh->kuc_msglen = len;
2033
2034         return (void *)(lh + 1);
2035 }
2036 EXPORT_SYMBOL(kuc_alloc);
2037
2038 /* Takes pointer to payload area */
2039 void kuc_free(void *p, int payload_len)
2040 {
2041         struct kuc_hdr *lh = kuc_ptr(p);
2042         OBD_FREE(lh, kuc_len(payload_len));
2043 }
2044 EXPORT_SYMBOL(kuc_free);
2045
2046 struct obd_request_slot_waiter {
2047         struct list_head        orsw_entry;
2048         wait_queue_head_t       orsw_waitq;
2049         bool                    orsw_signaled;
2050 };
2051
2052 static bool obd_request_slot_avail(struct client_obd *cli,
2053                                    struct obd_request_slot_waiter *orsw)
2054 {
2055         bool avail;
2056
2057         spin_lock(&cli->cl_loi_list_lock);
2058         avail = !!list_empty(&orsw->orsw_entry);
2059         spin_unlock(&cli->cl_loi_list_lock);
2060
2061         return avail;
2062 };
2063
2064 /*
2065  * For network flow control, the RPC sponsor needs to acquire a credit
2066  * before sending the RPC. The credits count for a connection is defined
2067  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2068  * the subsequent RPC sponsors need to wait until others released their
2069  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2070  */
2071 int obd_get_request_slot(struct client_obd *cli)
2072 {
2073         struct obd_request_slot_waiter   orsw;
2074         struct l_wait_info               lwi;
2075         int                              rc;
2076
2077         spin_lock(&cli->cl_loi_list_lock);
2078         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2079                 cli->cl_rpcs_in_flight++;
2080                 spin_unlock(&cli->cl_loi_list_lock);
2081                 return 0;
2082         }
2083
2084         init_waitqueue_head(&orsw.orsw_waitq);
2085         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2086         orsw.orsw_signaled = false;
2087         spin_unlock(&cli->cl_loi_list_lock);
2088
2089         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2090         rc = l_wait_event(orsw.orsw_waitq,
2091                           obd_request_slot_avail(cli, &orsw) ||
2092                           orsw.orsw_signaled,
2093                           &lwi);
2094
2095         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2096          * freed but other (such as obd_put_request_slot) is using it. */
2097         spin_lock(&cli->cl_loi_list_lock);
2098         if (rc != 0) {
2099                 if (!orsw.orsw_signaled) {
2100                         if (list_empty(&orsw.orsw_entry))
2101                                 cli->cl_rpcs_in_flight--;
2102                         else
2103                                 list_del(&orsw.orsw_entry);
2104                 }
2105         }
2106
2107         if (orsw.orsw_signaled) {
2108                 LASSERT(list_empty(&orsw.orsw_entry));
2109
2110                 rc = -EINTR;
2111         }
2112         spin_unlock(&cli->cl_loi_list_lock);
2113
2114         return rc;
2115 }
2116 EXPORT_SYMBOL(obd_get_request_slot);
2117
2118 void obd_put_request_slot(struct client_obd *cli)
2119 {
2120         struct obd_request_slot_waiter *orsw;
2121
2122         spin_lock(&cli->cl_loi_list_lock);
2123         cli->cl_rpcs_in_flight--;
2124
2125         /* If there is free slot, wakeup the first waiter. */
2126         if (!list_empty(&cli->cl_flight_waiters) &&
2127             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2128                 orsw = list_entry(cli->cl_flight_waiters.next,
2129                                   struct obd_request_slot_waiter, orsw_entry);
2130                 list_del_init(&orsw->orsw_entry);
2131                 cli->cl_rpcs_in_flight++;
2132                 wake_up(&orsw->orsw_waitq);
2133         }
2134         spin_unlock(&cli->cl_loi_list_lock);
2135 }
2136 EXPORT_SYMBOL(obd_put_request_slot);
2137
2138 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2139 {
2140         return cli->cl_max_rpcs_in_flight;
2141 }
2142 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2143
2144 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2145 {
2146         struct obd_request_slot_waiter *orsw;
2147         __u32                           old;
2148         int                             diff;
2149         int                             i;
2150         char                            *typ_name;
2151         int                             rc;
2152
2153         if (max > OBD_MAX_RIF_MAX || max < 1)
2154                 return -ERANGE;
2155
2156         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2157         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2158                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2159                  * strictly lower that max_rpcs_in_flight */
2160                 if (max < 2) {
2161                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2162                                "because it must be higher than "
2163                                "max_mod_rpcs_in_flight value",
2164                                cli->cl_import->imp_obd->obd_name);
2165                         return -ERANGE;
2166                 }
2167                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2168                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2169                         if (rc != 0)
2170                                 return rc;
2171                 }
2172         }
2173
2174         spin_lock(&cli->cl_loi_list_lock);
2175         old = cli->cl_max_rpcs_in_flight;
2176         cli->cl_max_rpcs_in_flight = max;
2177         client_adjust_max_dirty(cli);
2178
2179         diff = max - old;
2180
2181         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2182         for (i = 0; i < diff; i++) {
2183                 if (list_empty(&cli->cl_flight_waiters))
2184                         break;
2185
2186                 orsw = list_entry(cli->cl_flight_waiters.next,
2187                                   struct obd_request_slot_waiter, orsw_entry);
2188                 list_del_init(&orsw->orsw_entry);
2189                 cli->cl_rpcs_in_flight++;
2190                 wake_up(&orsw->orsw_waitq);
2191         }
2192         spin_unlock(&cli->cl_loi_list_lock);
2193
2194         return 0;
2195 }
2196 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2197
2198 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2199 {
2200         return cli->cl_max_mod_rpcs_in_flight;
2201 }
2202 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2203
2204 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2205 {
2206         struct obd_connect_data *ocd;
2207         __u16 maxmodrpcs;
2208         __u16 prev;
2209
2210         if (max > OBD_MAX_RIF_MAX || max < 1)
2211                 return -ERANGE;
2212
2213         /* cannot exceed or equal max_rpcs_in_flight */
2214         if (max >= cli->cl_max_rpcs_in_flight) {
2215                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2216                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2217                        cli->cl_import->imp_obd->obd_name,
2218                        max, cli->cl_max_rpcs_in_flight);
2219                 return -ERANGE;
2220         }
2221
2222         /* cannot exceed max modify RPCs in flight supported by the server */
2223         ocd = &cli->cl_import->imp_connect_data;
2224         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2225                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2226         else
2227                 maxmodrpcs = 1;
2228         if (max > maxmodrpcs) {
2229                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2230                        "higher than max_mod_rpcs_per_client value (%hu) "
2231                        "returned by the server at connection\n",
2232                        cli->cl_import->imp_obd->obd_name,
2233                        max, maxmodrpcs);
2234                 return -ERANGE;
2235         }
2236
2237         spin_lock(&cli->cl_mod_rpcs_lock);
2238
2239         prev = cli->cl_max_mod_rpcs_in_flight;
2240         cli->cl_max_mod_rpcs_in_flight = max;
2241
2242         /* wakeup waiters if limit has been increased */
2243         if (cli->cl_max_mod_rpcs_in_flight > prev)
2244                 wake_up(&cli->cl_mod_rpcs_waitq);
2245
2246         spin_unlock(&cli->cl_mod_rpcs_lock);
2247
2248         return 0;
2249 }
2250 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2251
2252 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2253                                struct seq_file *seq)
2254 {
2255         unsigned long mod_tot = 0, mod_cum;
2256         struct timespec64 now;
2257         int i;
2258
2259         ktime_get_real_ts64(&now);
2260
2261         spin_lock(&cli->cl_mod_rpcs_lock);
2262
2263         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2264                    (s64)now.tv_sec, now.tv_nsec);
2265         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2266                    cli->cl_mod_rpcs_in_flight);
2267
2268         seq_printf(seq, "\n\t\t\tmodify\n");
2269         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2270
2271         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2272
2273         mod_cum = 0;
2274         for (i = 0; i < OBD_HIST_MAX; i++) {
2275                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2276                 mod_cum += mod;
2277                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2278                            i, mod, pct(mod, mod_tot),
2279                            pct(mod_cum, mod_tot));
2280                 if (mod_cum == mod_tot)
2281                         break;
2282         }
2283
2284         spin_unlock(&cli->cl_mod_rpcs_lock);
2285
2286         return 0;
2287 }
2288 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2289
2290 /* The number of modify RPCs sent in parallel is limited
2291  * because the server has a finite number of slots per client to
2292  * store request result and ensure reply reconstruction when needed.
2293  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2294  * that takes into account server limit and cl_max_rpcs_in_flight
2295  * value.
2296  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2297  * one close request is allowed above the maximum.
2298  */
2299 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2300                                                  bool close_req)
2301 {
2302         bool avail;
2303
2304         /* A slot is available if
2305          * - number of modify RPCs in flight is less than the max
2306          * - it's a close RPC and no other close request is in flight
2307          */
2308         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2309                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2310
2311         return avail;
2312 }
2313
2314 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2315                                          bool close_req)
2316 {
2317         bool avail;
2318
2319         spin_lock(&cli->cl_mod_rpcs_lock);
2320         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2321         spin_unlock(&cli->cl_mod_rpcs_lock);
2322         return avail;
2323 }
2324
2325 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2326 {
2327         if (it != NULL &&
2328             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2329              it->it_op == IT_READDIR ||
2330              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2331                         return true;
2332         return false;
2333 }
2334
2335 /* Get a modify RPC slot from the obd client @cli according
2336  * to the kind of operation @opc that is going to be sent
2337  * and the intent @it of the operation if it applies.
2338  * If the maximum number of modify RPCs in flight is reached
2339  * the thread is put to sleep.
2340  * Returns the tag to be set in the request message. Tag 0
2341  * is reserved for non-modifying requests.
2342  */
2343 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2344                            struct lookup_intent *it)
2345 {
2346         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2347         bool                    close_req = false;
2348         __u16                   i, max;
2349
2350         /* read-only metadata RPCs don't consume a slot on MDT
2351          * for reply reconstruction
2352          */
2353         if (obd_skip_mod_rpc_slot(it))
2354                 return 0;
2355
2356         if (opc == MDS_CLOSE)
2357                 close_req = true;
2358
2359         do {
2360                 spin_lock(&cli->cl_mod_rpcs_lock);
2361                 max = cli->cl_max_mod_rpcs_in_flight;
2362                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2363                         /* there is a slot available */
2364                         cli->cl_mod_rpcs_in_flight++;
2365                         if (close_req)
2366                                 cli->cl_close_rpcs_in_flight++;
2367                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2368                                          cli->cl_mod_rpcs_in_flight);
2369                         /* find a free tag */
2370                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2371                                                 max + 1);
2372                         LASSERT(i < OBD_MAX_RIF_MAX);
2373                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2374                         spin_unlock(&cli->cl_mod_rpcs_lock);
2375                         /* tag 0 is reserved for non-modify RPCs */
2376                         return i + 1;
2377                 }
2378                 spin_unlock(&cli->cl_mod_rpcs_lock);
2379
2380                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2381                        "opc %u, max %hu\n",
2382                        cli->cl_import->imp_obd->obd_name, opc, max);
2383
2384                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2385                                        obd_mod_rpc_slot_avail(cli, close_req),
2386                                        &lwi);
2387         } while (true);
2388 }
2389 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2390
2391 /* Put a modify RPC slot from the obd client @cli according
2392  * to the kind of operation @opc that has been sent and the
2393  * intent @it of the operation if it applies.
2394  */
2395 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2396                           struct lookup_intent *it, __u16 tag)
2397 {
2398         bool                    close_req = false;
2399
2400         if (obd_skip_mod_rpc_slot(it))
2401                 return;
2402
2403         if (opc == MDS_CLOSE)
2404                 close_req = true;
2405
2406         spin_lock(&cli->cl_mod_rpcs_lock);
2407         cli->cl_mod_rpcs_in_flight--;
2408         if (close_req)
2409                 cli->cl_close_rpcs_in_flight--;
2410         /* release the tag in the bitmap */
2411         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2412         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2413         spin_unlock(&cli->cl_mod_rpcs_lock);
2414         wake_up(&cli->cl_mod_rpcs_waitq);
2415 }
2416 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2417