Whamcloud - gitweb
1325398599bd7c223b502327ded9fcd188d20d01
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53
54 static struct kmem_cache *obd_device_cachep;
55
56 static struct workqueue_struct *zombie_wq;
57
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61                               const char *status, int locks, int debug_level);
62
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         struct list_head *tmp;
102         struct obd_type *type;
103
104         spin_lock(&obd_types_lock);
105         list_for_each(tmp, &obd_types) {
106                 type = list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124
125                 if (strcmp(modname, "obdfilter") == 0)
126                         modname = "ofd";
127
128                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129                         modname = LUSTRE_OSP_NAME;
130
131                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132                         modname = LUSTRE_MDT_NAME;
133
134                 if (!request_module("%s", modname)) {
135                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136                         type = class_search_type(name);
137                 } else {
138                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139                                            modname);
140                 }
141         }
142 #endif
143         if (type) {
144                 spin_lock(&type->obd_type_lock);
145                 type->typ_refcnt++;
146                 try_module_get(type->typ_dt_ops->o_owner);
147                 spin_unlock(&type->obd_type_lock);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         OBD_FREE(kobj, sizeof(*kobj));
164 }
165
166 static struct kobj_type class_ktype = {
167         .sysfs_ops      = &lustre_sysfs_ops,
168         .release        = class_sysfs_release,
169 };
170
171 struct kobject *class_setup_tunables(const char *name)
172 {
173         struct kobject *kobj;
174         int rc;
175
176 #ifdef HAVE_SERVER_SUPPORT
177         kobj = kset_find_obj(lustre_kset, name);
178         if (kobj)
179                 return kobj;
180 #endif
181         OBD_ALLOC(kobj, sizeof(*kobj));
182         if (!kobj)
183                 return ERR_PTR(-ENOMEM);
184
185         kobj->kset = lustre_kset;
186         kobject_init(kobj, &class_ktype);
187         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
188         if (rc) {
189                 kobject_put(kobj);
190                 return ERR_PTR(rc);
191         }
192         return kobj;
193 }
194 EXPORT_SYMBOL(class_setup_tunables);
195
196 #define CLASS_MAX_NAME 1024
197
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199                         bool enable_proc, struct lprocfs_vars *vars,
200                         const char *name, struct lu_device_type *ldt)
201 {
202         struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
204         struct qstr dname;
205 #endif /* HAVE_SERVER_SUPPORT */
206         int rc = 0;
207
208         ENTRY;
209         /* sanity check */
210         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
211
212         if (class_search_type(name)) {
213                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
214                 RETURN(-EEXIST);
215         }
216
217         rc = -ENOMEM;
218         OBD_ALLOC(type, sizeof(*type));
219         if (type == NULL)
220                 RETURN(rc);
221
222         OBD_ALLOC_PTR(type->typ_dt_ops);
223         OBD_ALLOC_PTR(type->typ_md_ops);
224         OBD_ALLOC(type->typ_name, strlen(name) + 1);
225
226         if (type->typ_dt_ops == NULL ||
227             type->typ_md_ops == NULL ||
228             type->typ_name == NULL)
229                 GOTO (failed, rc);
230
231         *(type->typ_dt_ops) = *dt_ops;
232         /* md_ops is optional */
233         if (md_ops)
234                 *(type->typ_md_ops) = *md_ops;
235         strcpy(type->typ_name, name);
236         spin_lock_init(&type->obd_type_lock);
237
238 #ifdef CONFIG_PROC_FS
239         if (enable_proc) {
240                 type->typ_procroot = lprocfs_register(type->typ_name,
241                                                       proc_lustre_root,
242                                                       vars, type);
243                 if (IS_ERR(type->typ_procroot)) {
244                         rc = PTR_ERR(type->typ_procroot);
245                         type->typ_procroot = NULL;
246                         GOTO(failed, rc);
247                 }
248         }
249 #endif
250 #ifdef HAVE_SERVER_SUPPORT
251         dname.name = name;
252         dname.len = strlen(dname.name);
253         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
254                                        dname.len);
255         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256         if (type->typ_debugfs_entry) {
257                 dput(type->typ_debugfs_entry);
258                 type->typ_sym_filter = true;
259                 goto dir_exist;
260         }
261 #endif /* HAVE_SERVER_SUPPORT */
262
263         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
264                                                     debugfs_lustre_root,
265                                                     NULL, type);
266         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
268                                              : -ENOMEM;
269                 type->typ_debugfs_entry = NULL;
270                 GOTO(failed, rc);
271         }
272 #ifdef HAVE_SERVER_SUPPORT
273 dir_exist:
274 #endif
275         type->typ_kobj = class_setup_tunables(type->typ_name);
276         if (IS_ERR(type->typ_kobj))
277                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
278
279         if (ldt) {
280                 type->typ_lu = ldt;
281                 rc = lu_device_type_init(ldt);
282                 if (rc) {
283                         kobject_put(type->typ_kobj);
284                         GOTO(failed, rc);
285                 }
286         }
287
288         spin_lock(&obd_types_lock);
289         list_add(&type->typ_chain, &obd_types);
290         spin_unlock(&obd_types_lock);
291
292         RETURN(0);
293
294 failed:
295 #ifdef HAVE_SERVER_SUPPORT
296         if (type->typ_sym_filter)
297                 type->typ_debugfs_entry = NULL;
298 #endif
299         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300                 ldebugfs_remove(&type->typ_debugfs_entry);
301         if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303                 if (type->typ_procroot != NULL)
304                         remove_proc_subtree(type->typ_name, proc_lustre_root);
305 #endif
306                 OBD_FREE(type->typ_name, strlen(name) + 1);
307         }
308         if (type->typ_md_ops != NULL)
309                 OBD_FREE_PTR(type->typ_md_ops);
310         if (type->typ_dt_ops != NULL)
311                 OBD_FREE_PTR(type->typ_dt_ops);
312         OBD_FREE(type, sizeof(*type));
313         RETURN(rc);
314 }
315 EXPORT_SYMBOL(class_register_type);
316
317 int class_unregister_type(const char *name)
318 {
319         struct obd_type *type = class_search_type(name);
320         ENTRY;
321
322         if (!type) {
323                 CERROR("unknown obd type\n");
324                 RETURN(-EINVAL);
325         }
326
327         if (type->typ_refcnt) {
328                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329                 /* This is a bad situation, let's make the best of it */
330                 /* Remove ops, but leave the name for debugging */
331                 OBD_FREE_PTR(type->typ_dt_ops);
332                 OBD_FREE_PTR(type->typ_md_ops);
333                 RETURN(-EBUSY);
334         }
335
336         kobject_put(type->typ_kobj);
337
338         /* we do not use type->typ_procroot as for compatibility purposes
339          * other modules can share names (i.e. lod can use lov entry). so
340          * we can't reference pointer as it can get invalided when another
341          * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343         if (type->typ_procroot != NULL)
344                 remove_proc_subtree(type->typ_name, proc_lustre_root);
345         if (type->typ_procsym != NULL)
346                 lprocfs_remove(&type->typ_procsym);
347 #endif
348 #ifdef HAVE_SERVER_SUPPORT
349         if (type->typ_sym_filter)
350                 type->typ_debugfs_entry = NULL;
351 #endif
352         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353                 ldebugfs_remove(&type->typ_debugfs_entry);
354
355         if (type->typ_lu)
356                 lu_device_type_fini(type->typ_lu);
357
358         spin_lock(&obd_types_lock);
359         list_del(&type->typ_chain);
360         spin_unlock(&obd_types_lock);
361         OBD_FREE(type->typ_name, strlen(name) + 1);
362         if (type->typ_dt_ops != NULL)
363                 OBD_FREE_PTR(type->typ_dt_ops);
364         if (type->typ_md_ops != NULL)
365                 OBD_FREE_PTR(type->typ_md_ops);
366         OBD_FREE(type, sizeof(*type));
367         RETURN(0);
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
370
371 /**
372  * Create a new obd device.
373  *
374  * Allocate the new obd_device and initialize it.
375  *
376  * \param[in] type_name obd device type string.
377  * \param[in] name      obd device name.
378  * \param[in] uuid      obd device UUID
379  *
380  * \retval newdev         pointer to created obd_device
381  * \retval ERR_PTR(errno) on error
382  */
383 struct obd_device *class_newdev(const char *type_name, const char *name,
384                                 const char *uuid)
385 {
386         struct obd_device *newdev;
387         struct obd_type *type = NULL;
388         ENTRY;
389
390         if (strlen(name) >= MAX_OBD_NAME) {
391                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392                 RETURN(ERR_PTR(-EINVAL));
393         }
394
395         type = class_get_type(type_name);
396         if (type == NULL){
397                 CERROR("OBD: unknown type: %s\n", type_name);
398                 RETURN(ERR_PTR(-ENODEV));
399         }
400
401         newdev = obd_device_alloc();
402         if (newdev == NULL) {
403                 class_put_type(type);
404                 RETURN(ERR_PTR(-ENOMEM));
405         }
406         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408         newdev->obd_type = type;
409         newdev->obd_minor = -1;
410
411         rwlock_init(&newdev->obd_pool_lock);
412         newdev->obd_pool_limit = 0;
413         newdev->obd_pool_slv = 0;
414
415         INIT_LIST_HEAD(&newdev->obd_exports);
416         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418         INIT_LIST_HEAD(&newdev->obd_exports_timed);
419         INIT_LIST_HEAD(&newdev->obd_nid_stats);
420         spin_lock_init(&newdev->obd_nid_lock);
421         spin_lock_init(&newdev->obd_dev_lock);
422         mutex_init(&newdev->obd_dev_mutex);
423         spin_lock_init(&newdev->obd_osfs_lock);
424         /* newdev->obd_osfs_age must be set to a value in the distant
425          * past to guarantee a fresh statfs is fetched on mount. */
426         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
427
428         /* XXX belongs in setup not attach  */
429         init_rwsem(&newdev->obd_observer_link_sem);
430         /* recovery data */
431         spin_lock_init(&newdev->obd_recovery_task_lock);
432         init_waitqueue_head(&newdev->obd_next_transno_waitq);
433         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437         INIT_LIST_HEAD(&newdev->obd_evict_list);
438         INIT_LIST_HEAD(&newdev->obd_lwp_list);
439
440         llog_group_init(&newdev->obd_olg);
441         /* Detach drops this */
442         atomic_set(&newdev->obd_refcount, 1);
443         lu_ref_init(&newdev->obd_reference);
444         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
445
446         newdev->obd_conn_inprogress = 0;
447
448         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
449
450         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451                newdev->obd_name, newdev);
452
453         return newdev;
454 }
455
456 /**
457  * Free obd device.
458  *
459  * \param[in] obd obd_device to be freed
460  *
461  * \retval none
462  */
463 void class_free_dev(struct obd_device *obd)
464 {
465         struct obd_type *obd_type = obd->obd_type;
466
467         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470                  "obd %p != obd_devs[%d] %p\n",
471                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473                  "obd_refcount should be 0, not %d\n",
474                  atomic_read(&obd->obd_refcount));
475         LASSERT(obd_type != NULL);
476
477         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478                obd->obd_name, obd->obd_type->typ_name);
479
480         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481                          obd->obd_name, obd->obd_uuid.uuid);
482         if (obd->obd_stopping) {
483                 int err;
484
485                 /* If we're not stopping, we were never set up */
486                 err = obd_cleanup(obd);
487                 if (err)
488                         CERROR("Cleanup %s returned %d\n",
489                                 obd->obd_name, err);
490         }
491
492         obd_device_free(obd);
493
494         class_put_type(obd_type);
495 }
496
497 /**
498  * Unregister obd device.
499  *
500  * Free slot in obd_dev[] used by \a obd.
501  *
502  * \param[in] new_obd obd_device to be unregistered
503  *
504  * \retval none
505  */
506 void class_unregister_device(struct obd_device *obd)
507 {
508         write_lock(&obd_dev_lock);
509         if (obd->obd_minor >= 0) {
510                 LASSERT(obd_devs[obd->obd_minor] == obd);
511                 obd_devs[obd->obd_minor] = NULL;
512                 obd->obd_minor = -1;
513         }
514         write_unlock(&obd_dev_lock);
515 }
516
517 /**
518  * Register obd device.
519  *
520  * Find free slot in obd_devs[], fills it with \a new_obd.
521  *
522  * \param[in] new_obd obd_device to be registered
523  *
524  * \retval 0          success
525  * \retval -EEXIST    device with this name is registered
526  * \retval -EOVERFLOW obd_devs[] is full
527  */
528 int class_register_device(struct obd_device *new_obd)
529 {
530         int ret = 0;
531         int i;
532         int new_obd_minor = 0;
533         bool minor_assign = false;
534         bool retried = false;
535
536 again:
537         write_lock(&obd_dev_lock);
538         for (i = 0; i < class_devno_max(); i++) {
539                 struct obd_device *obd = class_num2obd(i);
540
541                 if (obd != NULL &&
542                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
543
544                         if (!retried) {
545                                 write_unlock(&obd_dev_lock);
546
547                                 /* the obd_device could be waited to be
548                                  * destroyed by the "obd_zombie_impexp_thread".
549                                  */
550                                 obd_zombie_barrier();
551                                 retried = true;
552                                 goto again;
553                         }
554
555                         CERROR("%s: already exists, won't add\n",
556                                obd->obd_name);
557                         /* in case we found a free slot before duplicate */
558                         minor_assign = false;
559                         ret = -EEXIST;
560                         break;
561                 }
562                 if (!minor_assign && obd == NULL) {
563                         new_obd_minor = i;
564                         minor_assign = true;
565                 }
566         }
567
568         if (minor_assign) {
569                 new_obd->obd_minor = new_obd_minor;
570                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572                 obd_devs[new_obd_minor] = new_obd;
573         } else {
574                 if (ret == 0) {
575                         ret = -EOVERFLOW;
576                         CERROR("%s: all %u/%u devices used, increase "
577                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578                                i, class_devno_max(), ret);
579                 }
580         }
581         write_unlock(&obd_dev_lock);
582
583         RETURN(ret);
584 }
585
586 static int class_name2dev_nolock(const char *name)
587 {
588         int i;
589
590         if (!name)
591                 return -1;
592
593         for (i = 0; i < class_devno_max(); i++) {
594                 struct obd_device *obd = class_num2obd(i);
595
596                 if (obd && strcmp(name, obd->obd_name) == 0) {
597                         /* Make sure we finished attaching before we give
598                            out any references */
599                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600                         if (obd->obd_attached) {
601                                 return i;
602                         }
603                         break;
604                 }
605         }
606
607         return -1;
608 }
609
610 int class_name2dev(const char *name)
611 {
612         int i;
613
614         if (!name)
615                 return -1;
616
617         read_lock(&obd_dev_lock);
618         i = class_name2dev_nolock(name);
619         read_unlock(&obd_dev_lock);
620
621         return i;
622 }
623 EXPORT_SYMBOL(class_name2dev);
624
625 struct obd_device *class_name2obd(const char *name)
626 {
627         int dev = class_name2dev(name);
628
629         if (dev < 0 || dev > class_devno_max())
630                 return NULL;
631         return class_num2obd(dev);
632 }
633 EXPORT_SYMBOL(class_name2obd);
634
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
636 {
637         int i;
638
639         for (i = 0; i < class_devno_max(); i++) {
640                 struct obd_device *obd = class_num2obd(i);
641
642                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
644                         return i;
645                 }
646         }
647
648         return -1;
649 }
650
651 int class_uuid2dev(struct obd_uuid *uuid)
652 {
653         int i;
654
655         read_lock(&obd_dev_lock);
656         i = class_uuid2dev_nolock(uuid);
657         read_unlock(&obd_dev_lock);
658
659         return i;
660 }
661 EXPORT_SYMBOL(class_uuid2dev);
662
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
664 {
665         int dev = class_uuid2dev(uuid);
666         if (dev < 0)
667                 return NULL;
668         return class_num2obd(dev);
669 }
670 EXPORT_SYMBOL(class_uuid2obd);
671
672 /**
673  * Get obd device from ::obd_devs[]
674  *
675  * \param num [in] array index
676  *
677  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678  *         otherwise return the obd device there.
679  */
680 struct obd_device *class_num2obd(int num)
681 {
682         struct obd_device *obd = NULL;
683
684         if (num < class_devno_max()) {
685                 obd = obd_devs[num];
686                 if (obd == NULL)
687                         return NULL;
688
689                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690                          "%p obd_magic %08x != %08x\n",
691                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692                 LASSERTF(obd->obd_minor == num,
693                          "%p obd_minor %0d != %0d\n",
694                          obd, obd->obd_minor, num);
695         }
696
697         return obd;
698 }
699
700 /**
701  * Find obd in obd_dev[] by name or uuid.
702  *
703  * Increment obd's refcount if found.
704  *
705  * \param[in] str obd name or uuid
706  *
707  * \retval NULL    if not found
708  * \retval target  pointer to found obd_device
709  */
710 struct obd_device *class_dev_by_str(const char *str)
711 {
712         struct obd_device *target = NULL;
713         struct obd_uuid tgtuuid;
714         int rc;
715
716         obd_str2uuid(&tgtuuid, str);
717
718         read_lock(&obd_dev_lock);
719         rc = class_uuid2dev_nolock(&tgtuuid);
720         if (rc < 0)
721                 rc = class_name2dev_nolock(str);
722
723         if (rc >= 0)
724                 target = class_num2obd(rc);
725
726         if (target != NULL)
727                 class_incref(target, "find", current);
728         read_unlock(&obd_dev_lock);
729
730         RETURN(target);
731 }
732 EXPORT_SYMBOL(class_dev_by_str);
733
734 /**
735  * Get obd devices count. Device in any
736  *    state are counted
737  * \retval obd device count
738  */
739 int get_devices_count(void)
740 {
741         int index, max_index = class_devno_max(), dev_count = 0;
742
743         read_lock(&obd_dev_lock);
744         for (index = 0; index <= max_index; index++) {
745                 struct obd_device *obd = class_num2obd(index);
746                 if (obd != NULL)
747                         dev_count++;
748         }
749         read_unlock(&obd_dev_lock);
750
751         return dev_count;
752 }
753 EXPORT_SYMBOL(get_devices_count);
754
755 void class_obd_list(void)
756 {
757         char *status;
758         int i;
759
760         read_lock(&obd_dev_lock);
761         for (i = 0; i < class_devno_max(); i++) {
762                 struct obd_device *obd = class_num2obd(i);
763
764                 if (obd == NULL)
765                         continue;
766                 if (obd->obd_stopping)
767                         status = "ST";
768                 else if (obd->obd_set_up)
769                         status = "UP";
770                 else if (obd->obd_attached)
771                         status = "AT";
772                 else
773                         status = "--";
774                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775                          i, status, obd->obd_type->typ_name,
776                          obd->obd_name, obd->obd_uuid.uuid,
777                          atomic_read(&obd->obd_refcount));
778         }
779         read_unlock(&obd_dev_lock);
780         return;
781 }
782
783 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
784    specified, then only the client with that uuid is returned,
785    otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787                                           const char * typ_name,
788                                           struct obd_uuid *grp_uuid)
789 {
790         int i;
791
792         read_lock(&obd_dev_lock);
793         for (i = 0; i < class_devno_max(); i++) {
794                 struct obd_device *obd = class_num2obd(i);
795
796                 if (obd == NULL)
797                         continue;
798                 if ((strncmp(obd->obd_type->typ_name, typ_name,
799                              strlen(typ_name)) == 0)) {
800                         if (obd_uuid_equals(tgt_uuid,
801                                             &obd->u.cli.cl_target_uuid) &&
802                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
803                                                          &obd->obd_uuid) : 1)) {
804                                 read_unlock(&obd_dev_lock);
805                                 return obd;
806                         }
807                 }
808         }
809         read_unlock(&obd_dev_lock);
810
811         return NULL;
812 }
813 EXPORT_SYMBOL(class_find_client_obd);
814
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816    searching at *next, and if a device is found, the next index to look
817    at is saved in *next. If next is NULL, then the first matching device
818    will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
820 {
821         int i;
822
823         if (next == NULL)
824                 i = 0;
825         else if (*next >= 0 && *next < class_devno_max())
826                 i = *next;
827         else
828                 return NULL;
829
830         read_lock(&obd_dev_lock);
831         for (; i < class_devno_max(); i++) {
832                 struct obd_device *obd = class_num2obd(i);
833
834                 if (obd == NULL)
835                         continue;
836                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
837                         if (next != NULL)
838                                 *next = i+1;
839                         read_unlock(&obd_dev_lock);
840                         return obd;
841                 }
842         }
843         read_unlock(&obd_dev_lock);
844
845         return NULL;
846 }
847 EXPORT_SYMBOL(class_devices_in_group);
848
849 /**
850  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851  * adjust sptlrpc settings accordingly.
852  */
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
854 {
855         struct obd_device  *obd;
856         const char         *type;
857         int                 i, rc = 0, rc2;
858
859         LASSERT(namelen > 0);
860
861         read_lock(&obd_dev_lock);
862         for (i = 0; i < class_devno_max(); i++) {
863                 obd = class_num2obd(i);
864
865                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
866                         continue;
867
868                 /* only notify mdc, osc, osp, lwp, mdt, ost
869                  * because only these have a -sptlrpc llog */
870                 type = obd->obd_type->typ_name;
871                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876                     strcmp(type, LUSTRE_OST_NAME) != 0)
877                         continue;
878
879                 if (strncmp(obd->obd_name, fsname, namelen))
880                         continue;
881
882                 class_incref(obd, __FUNCTION__, obd);
883                 read_unlock(&obd_dev_lock);
884                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885                                          sizeof(KEY_SPTLRPC_CONF),
886                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
887                 rc = rc ? rc : rc2;
888                 class_decref(obd, __FUNCTION__, obd);
889                 read_lock(&obd_dev_lock);
890         }
891         read_unlock(&obd_dev_lock);
892         return rc;
893 }
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
895
896 void obd_cleanup_caches(void)
897 {
898         ENTRY;
899         if (obd_device_cachep) {
900                 kmem_cache_destroy(obd_device_cachep);
901                 obd_device_cachep = NULL;
902         }
903
904         EXIT;
905 }
906
907 int obd_init_caches(void)
908 {
909         int rc;
910         ENTRY;
911
912         LASSERT(obd_device_cachep == NULL);
913         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
914                                               sizeof(struct obd_device),
915                                               0, 0, NULL);
916         if (!obd_device_cachep)
917                 GOTO(out, rc = -ENOMEM);
918
919         RETURN(0);
920 out:
921         obd_cleanup_caches();
922         RETURN(rc);
923 }
924
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
927 {
928         struct obd_export *export;
929         ENTRY;
930
931         if (!conn) {
932                 CDEBUG(D_CACHE, "looking for null handle\n");
933                 RETURN(NULL);
934         }
935
936         if (conn->cookie == -1) {  /* this means assign a new connection */
937                 CDEBUG(D_CACHE, "want a new connection\n");
938                 RETURN(NULL);
939         }
940
941         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942         export = class_handle2object(conn->cookie, NULL);
943         RETURN(export);
944 }
945 EXPORT_SYMBOL(class_conn2export);
946
947 struct obd_device *class_exp2obd(struct obd_export *exp)
948 {
949         if (exp)
950                 return exp->exp_obd;
951         return NULL;
952 }
953 EXPORT_SYMBOL(class_exp2obd);
954
955 struct obd_device *class_conn2obd(struct lustre_handle *conn)
956 {
957         struct obd_export *export;
958         export = class_conn2export(conn);
959         if (export) {
960                 struct obd_device *obd = export->exp_obd;
961                 class_export_put(export);
962                 return obd;
963         }
964         return NULL;
965 }
966
967 struct obd_import *class_exp2cliimp(struct obd_export *exp)
968 {
969         struct obd_device *obd = exp->exp_obd;
970         if (obd == NULL)
971                 return NULL;
972         return obd->u.cli.cl_import;
973 }
974 EXPORT_SYMBOL(class_exp2cliimp);
975
976 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
977 {
978         struct obd_device *obd = class_conn2obd(conn);
979         if (obd == NULL)
980                 return NULL;
981         return obd->u.cli.cl_import;
982 }
983
984 /* Export management functions */
985 static void class_export_destroy(struct obd_export *exp)
986 {
987         struct obd_device *obd = exp->exp_obd;
988         ENTRY;
989
990         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
991         LASSERT(obd != NULL);
992
993         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
994                exp->exp_client_uuid.uuid, obd->obd_name);
995
996         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
997         if (exp->exp_connection)
998                 ptlrpc_put_connection_superhack(exp->exp_connection);
999
1000         LASSERT(list_empty(&exp->exp_outstanding_replies));
1001         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1002         LASSERT(list_empty(&exp->exp_req_replay_queue));
1003         LASSERT(list_empty(&exp->exp_hp_rpcs));
1004         obd_destroy_export(exp);
1005         /* self export doesn't hold a reference to an obd, although it
1006          * exists until freeing of the obd */
1007         if (exp != obd->obd_self_export)
1008                 class_decref(obd, "export", exp);
1009
1010         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1011         EXIT;
1012 }
1013
1014 static void export_handle_addref(void *export)
1015 {
1016         class_export_get(export);
1017 }
1018
1019 static struct portals_handle_ops export_handle_ops = {
1020         .hop_addref = export_handle_addref,
1021         .hop_free   = NULL,
1022 };
1023
1024 struct obd_export *class_export_get(struct obd_export *exp)
1025 {
1026         atomic_inc(&exp->exp_refcount);
1027         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1028                atomic_read(&exp->exp_refcount));
1029         return exp;
1030 }
1031 EXPORT_SYMBOL(class_export_get);
1032
1033 void class_export_put(struct obd_export *exp)
1034 {
1035         LASSERT(exp != NULL);
1036         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1037         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1038                atomic_read(&exp->exp_refcount) - 1);
1039
1040         if (atomic_dec_and_test(&exp->exp_refcount)) {
1041                 struct obd_device *obd = exp->exp_obd;
1042
1043                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1044                        exp, exp->exp_client_uuid.uuid);
1045
1046                 /* release nid stat refererence */
1047                 lprocfs_exp_cleanup(exp);
1048
1049                 if (exp == obd->obd_self_export) {
1050                         /* self export should be destroyed without
1051                          * zombie thread as it doesn't hold a
1052                          * reference to obd and doesn't hold any
1053                          * resources */
1054                         class_export_destroy(exp);
1055                         /* self export is destroyed, no class
1056                          * references exist and it is safe to free
1057                          * obd */
1058                         class_free_dev(obd);
1059                 } else {
1060                         LASSERT(!list_empty(&exp->exp_obd_chain));
1061                         obd_zombie_export_add(exp);
1062                 }
1063
1064         }
1065 }
1066 EXPORT_SYMBOL(class_export_put);
1067
1068 static void obd_zombie_exp_cull(struct work_struct *ws)
1069 {
1070         struct obd_export *export;
1071
1072         export = container_of(ws, struct obd_export, exp_zombie_work);
1073         class_export_destroy(export);
1074 }
1075
1076 /* Creates a new export, adds it to the hash table, and returns a
1077  * pointer to it. The refcount is 2: one for the hash reference, and
1078  * one for the pointer returned by this function. */
1079 struct obd_export *__class_new_export(struct obd_device *obd,
1080                                       struct obd_uuid *cluuid, bool is_self)
1081 {
1082         struct obd_export *export;
1083         struct cfs_hash *hash = NULL;
1084         int rc = 0;
1085         ENTRY;
1086
1087         OBD_ALLOC_PTR(export);
1088         if (!export)
1089                 return ERR_PTR(-ENOMEM);
1090
1091         export->exp_conn_cnt = 0;
1092         export->exp_lock_hash = NULL;
1093         export->exp_flock_hash = NULL;
1094         /* 2 = class_handle_hash + last */
1095         atomic_set(&export->exp_refcount, 2);
1096         atomic_set(&export->exp_rpc_count, 0);
1097         atomic_set(&export->exp_cb_count, 0);
1098         atomic_set(&export->exp_locks_count, 0);
1099 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1100         INIT_LIST_HEAD(&export->exp_locks_list);
1101         spin_lock_init(&export->exp_locks_list_guard);
1102 #endif
1103         atomic_set(&export->exp_replay_count, 0);
1104         export->exp_obd = obd;
1105         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1106         spin_lock_init(&export->exp_uncommitted_replies_lock);
1107         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1108         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1109         INIT_LIST_HEAD(&export->exp_handle.h_link);
1110         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1111         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1112         class_handle_hash(&export->exp_handle, &export_handle_ops);
1113         export->exp_last_request_time = ktime_get_real_seconds();
1114         spin_lock_init(&export->exp_lock);
1115         spin_lock_init(&export->exp_rpc_lock);
1116         INIT_HLIST_NODE(&export->exp_uuid_hash);
1117         INIT_HLIST_NODE(&export->exp_nid_hash);
1118         INIT_HLIST_NODE(&export->exp_gen_hash);
1119         spin_lock_init(&export->exp_bl_list_lock);
1120         INIT_LIST_HEAD(&export->exp_bl_list);
1121         INIT_LIST_HEAD(&export->exp_stale_list);
1122         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1123
1124         export->exp_sp_peer = LUSTRE_SP_ANY;
1125         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1126         export->exp_client_uuid = *cluuid;
1127         obd_init_export(export);
1128
1129         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1130                 spin_lock(&obd->obd_dev_lock);
1131                 /* shouldn't happen, but might race */
1132                 if (obd->obd_stopping)
1133                         GOTO(exit_unlock, rc = -ENODEV);
1134
1135                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1136                 if (hash == NULL)
1137                         GOTO(exit_unlock, rc = -ENODEV);
1138                 spin_unlock(&obd->obd_dev_lock);
1139
1140                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1141                 if (rc != 0) {
1142                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1143                                       obd->obd_name, cluuid->uuid, rc);
1144                         GOTO(exit_err, rc = -EALREADY);
1145                 }
1146         }
1147
1148         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1149         spin_lock(&obd->obd_dev_lock);
1150         if (obd->obd_stopping) {
1151                 if (hash)
1152                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1153                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1154         }
1155
1156         if (!is_self) {
1157                 class_incref(obd, "export", export);
1158                 list_add_tail(&export->exp_obd_chain_timed,
1159                               &obd->obd_exports_timed);
1160                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1161                 obd->obd_num_exports++;
1162         } else {
1163                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1164                 INIT_LIST_HEAD(&export->exp_obd_chain);
1165         }
1166         spin_unlock(&obd->obd_dev_lock);
1167         if (hash)
1168                 cfs_hash_putref(hash);
1169         RETURN(export);
1170
1171 exit_unlock:
1172         spin_unlock(&obd->obd_dev_lock);
1173 exit_err:
1174         if (hash)
1175                 cfs_hash_putref(hash);
1176         class_handle_unhash(&export->exp_handle);
1177         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1178         obd_destroy_export(export);
1179         OBD_FREE_PTR(export);
1180         return ERR_PTR(rc);
1181 }
1182
1183 struct obd_export *class_new_export(struct obd_device *obd,
1184                                     struct obd_uuid *uuid)
1185 {
1186         return __class_new_export(obd, uuid, false);
1187 }
1188 EXPORT_SYMBOL(class_new_export);
1189
1190 struct obd_export *class_new_export_self(struct obd_device *obd,
1191                                          struct obd_uuid *uuid)
1192 {
1193         return __class_new_export(obd, uuid, true);
1194 }
1195
1196 void class_unlink_export(struct obd_export *exp)
1197 {
1198         class_handle_unhash(&exp->exp_handle);
1199
1200         if (exp->exp_obd->obd_self_export == exp) {
1201                 class_export_put(exp);
1202                 return;
1203         }
1204
1205         spin_lock(&exp->exp_obd->obd_dev_lock);
1206         /* delete an uuid-export hashitem from hashtables */
1207         if (!hlist_unhashed(&exp->exp_uuid_hash))
1208                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1209                              &exp->exp_client_uuid,
1210                              &exp->exp_uuid_hash);
1211
1212 #ifdef HAVE_SERVER_SUPPORT
1213         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1214                 struct tg_export_data   *ted = &exp->exp_target_data;
1215                 struct cfs_hash         *hash;
1216
1217                 /* Because obd_gen_hash will not be released until
1218                  * class_cleanup(), so hash should never be NULL here */
1219                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1220                 LASSERT(hash != NULL);
1221                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1222                              &exp->exp_gen_hash);
1223                 cfs_hash_putref(hash);
1224         }
1225 #endif /* HAVE_SERVER_SUPPORT */
1226
1227         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1228         list_del_init(&exp->exp_obd_chain_timed);
1229         exp->exp_obd->obd_num_exports--;
1230         spin_unlock(&exp->exp_obd->obd_dev_lock);
1231         atomic_inc(&obd_stale_export_num);
1232
1233         /* A reference is kept by obd_stale_exports list */
1234         obd_stale_export_put(exp);
1235 }
1236 EXPORT_SYMBOL(class_unlink_export);
1237
1238 /* Import management functions */
1239 static void class_import_destroy(struct obd_import *imp)
1240 {
1241         ENTRY;
1242
1243         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1244                 imp->imp_obd->obd_name);
1245
1246         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1247
1248         ptlrpc_put_connection_superhack(imp->imp_connection);
1249
1250         while (!list_empty(&imp->imp_conn_list)) {
1251                 struct obd_import_conn *imp_conn;
1252
1253                 imp_conn = list_entry(imp->imp_conn_list.next,
1254                                       struct obd_import_conn, oic_item);
1255                 list_del_init(&imp_conn->oic_item);
1256                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1257                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1258         }
1259
1260         LASSERT(imp->imp_sec == NULL);
1261         class_decref(imp->imp_obd, "import", imp);
1262         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1263         EXIT;
1264 }
1265
1266 static void import_handle_addref(void *import)
1267 {
1268         class_import_get(import);
1269 }
1270
1271 static struct portals_handle_ops import_handle_ops = {
1272         .hop_addref = import_handle_addref,
1273         .hop_free   = NULL,
1274 };
1275
1276 struct obd_import *class_import_get(struct obd_import *import)
1277 {
1278         atomic_inc(&import->imp_refcount);
1279         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1280                atomic_read(&import->imp_refcount),
1281                import->imp_obd->obd_name);
1282         return import;
1283 }
1284 EXPORT_SYMBOL(class_import_get);
1285
1286 void class_import_put(struct obd_import *imp)
1287 {
1288         ENTRY;
1289
1290         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1291
1292         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1293                atomic_read(&imp->imp_refcount) - 1,
1294                imp->imp_obd->obd_name);
1295
1296         if (atomic_dec_and_test(&imp->imp_refcount)) {
1297                 CDEBUG(D_INFO, "final put import %p\n", imp);
1298                 obd_zombie_import_add(imp);
1299         }
1300
1301         /* catch possible import put race */
1302         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1303         EXIT;
1304 }
1305 EXPORT_SYMBOL(class_import_put);
1306
1307 static void init_imp_at(struct imp_at *at) {
1308         int i;
1309         at_init(&at->iat_net_latency, 0, 0);
1310         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1311                 /* max service estimates are tracked on the server side, so
1312                    don't use the AT history here, just use the last reported
1313                    val. (But keep hist for proc histogram, worst_ever) */
1314                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1315                         AT_FLG_NOHIST);
1316         }
1317 }
1318
1319 static void obd_zombie_imp_cull(struct work_struct *ws)
1320 {
1321         struct obd_import *import;
1322
1323         import = container_of(ws, struct obd_import, imp_zombie_work);
1324         class_import_destroy(import);
1325 }
1326
1327 struct obd_import *class_new_import(struct obd_device *obd)
1328 {
1329         struct obd_import *imp;
1330         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1331
1332         OBD_ALLOC(imp, sizeof(*imp));
1333         if (imp == NULL)
1334                 return NULL;
1335
1336         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1337         INIT_LIST_HEAD(&imp->imp_replay_list);
1338         INIT_LIST_HEAD(&imp->imp_sending_list);
1339         INIT_LIST_HEAD(&imp->imp_delayed_list);
1340         INIT_LIST_HEAD(&imp->imp_committed_list);
1341         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1342         imp->imp_known_replied_xid = 0;
1343         imp->imp_replay_cursor = &imp->imp_committed_list;
1344         spin_lock_init(&imp->imp_lock);
1345         imp->imp_last_success_conn = 0;
1346         imp->imp_state = LUSTRE_IMP_NEW;
1347         imp->imp_obd = class_incref(obd, "import", imp);
1348         mutex_init(&imp->imp_sec_mutex);
1349         init_waitqueue_head(&imp->imp_recovery_waitq);
1350         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1351
1352         if (curr_pid_ns->child_reaper)
1353                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1354         else
1355                 imp->imp_sec_refpid = 1;
1356
1357         atomic_set(&imp->imp_refcount, 2);
1358         atomic_set(&imp->imp_unregistering, 0);
1359         atomic_set(&imp->imp_inflight, 0);
1360         atomic_set(&imp->imp_replay_inflight, 0);
1361         atomic_set(&imp->imp_inval_count, 0);
1362         INIT_LIST_HEAD(&imp->imp_conn_list);
1363         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1364         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1365         init_imp_at(&imp->imp_at);
1366
1367         /* the default magic is V2, will be used in connect RPC, and
1368          * then adjusted according to the flags in request/reply. */
1369         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1370
1371         return imp;
1372 }
1373 EXPORT_SYMBOL(class_new_import);
1374
1375 void class_destroy_import(struct obd_import *import)
1376 {
1377         LASSERT(import != NULL);
1378         LASSERT(import != LP_POISON);
1379
1380         class_handle_unhash(&import->imp_handle);
1381
1382         spin_lock(&import->imp_lock);
1383         import->imp_generation++;
1384         spin_unlock(&import->imp_lock);
1385         class_import_put(import);
1386 }
1387 EXPORT_SYMBOL(class_destroy_import);
1388
1389 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1390
1391 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1392 {
1393         spin_lock(&exp->exp_locks_list_guard);
1394
1395         LASSERT(lock->l_exp_refs_nr >= 0);
1396
1397         if (lock->l_exp_refs_target != NULL &&
1398             lock->l_exp_refs_target != exp) {
1399                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1400                               exp, lock, lock->l_exp_refs_target);
1401         }
1402         if ((lock->l_exp_refs_nr ++) == 0) {
1403                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1404                 lock->l_exp_refs_target = exp;
1405         }
1406         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1407                lock, exp, lock->l_exp_refs_nr);
1408         spin_unlock(&exp->exp_locks_list_guard);
1409 }
1410 EXPORT_SYMBOL(__class_export_add_lock_ref);
1411
1412 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1413 {
1414         spin_lock(&exp->exp_locks_list_guard);
1415         LASSERT(lock->l_exp_refs_nr > 0);
1416         if (lock->l_exp_refs_target != exp) {
1417                 LCONSOLE_WARN("lock %p, "
1418                               "mismatching export pointers: %p, %p\n",
1419                               lock, lock->l_exp_refs_target, exp);
1420         }
1421         if (-- lock->l_exp_refs_nr == 0) {
1422                 list_del_init(&lock->l_exp_refs_link);
1423                 lock->l_exp_refs_target = NULL;
1424         }
1425         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1426                lock, exp, lock->l_exp_refs_nr);
1427         spin_unlock(&exp->exp_locks_list_guard);
1428 }
1429 EXPORT_SYMBOL(__class_export_del_lock_ref);
1430 #endif
1431
1432 /* A connection defines an export context in which preallocation can
1433    be managed. This releases the export pointer reference, and returns
1434    the export handle, so the export refcount is 1 when this function
1435    returns. */
1436 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1437                   struct obd_uuid *cluuid)
1438 {
1439         struct obd_export *export;
1440         LASSERT(conn != NULL);
1441         LASSERT(obd != NULL);
1442         LASSERT(cluuid != NULL);
1443         ENTRY;
1444
1445         export = class_new_export(obd, cluuid);
1446         if (IS_ERR(export))
1447                 RETURN(PTR_ERR(export));
1448
1449         conn->cookie = export->exp_handle.h_cookie;
1450         class_export_put(export);
1451
1452         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1453                cluuid->uuid, conn->cookie);
1454         RETURN(0);
1455 }
1456 EXPORT_SYMBOL(class_connect);
1457
1458 /* if export is involved in recovery then clean up related things */
1459 static void class_export_recovery_cleanup(struct obd_export *exp)
1460 {
1461         struct obd_device *obd = exp->exp_obd;
1462
1463         spin_lock(&obd->obd_recovery_task_lock);
1464         if (obd->obd_recovering) {
1465                 if (exp->exp_in_recovery) {
1466                         spin_lock(&exp->exp_lock);
1467                         exp->exp_in_recovery = 0;
1468                         spin_unlock(&exp->exp_lock);
1469                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1470                         atomic_dec(&obd->obd_connected_clients);
1471                 }
1472
1473                 /* if called during recovery then should update
1474                  * obd_stale_clients counter,
1475                  * lightweight exports are not counted */
1476                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1477                         exp->exp_obd->obd_stale_clients++;
1478         }
1479         spin_unlock(&obd->obd_recovery_task_lock);
1480
1481         spin_lock(&exp->exp_lock);
1482         /** Cleanup req replay fields */
1483         if (exp->exp_req_replay_needed) {
1484                 exp->exp_req_replay_needed = 0;
1485
1486                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1487                 atomic_dec(&obd->obd_req_replay_clients);
1488         }
1489
1490         /** Cleanup lock replay data */
1491         if (exp->exp_lock_replay_needed) {
1492                 exp->exp_lock_replay_needed = 0;
1493
1494                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1495                 atomic_dec(&obd->obd_lock_replay_clients);
1496         }
1497         spin_unlock(&exp->exp_lock);
1498 }
1499
1500 /* This function removes 1-3 references from the export:
1501  * 1 - for export pointer passed
1502  * and if disconnect really need
1503  * 2 - removing from hash
1504  * 3 - in client_unlink_export
1505  * The export pointer passed to this function can destroyed */
1506 int class_disconnect(struct obd_export *export)
1507 {
1508         int already_disconnected;
1509         ENTRY;
1510
1511         if (export == NULL) {
1512                 CWARN("attempting to free NULL export %p\n", export);
1513                 RETURN(-EINVAL);
1514         }
1515
1516         spin_lock(&export->exp_lock);
1517         already_disconnected = export->exp_disconnected;
1518         export->exp_disconnected = 1;
1519         /*  We hold references of export for uuid hash
1520          *  and nid_hash and export link at least. So
1521          *  it is safe to call cfs_hash_del in there.  */
1522         if (!hlist_unhashed(&export->exp_nid_hash))
1523                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1524                              &export->exp_connection->c_peer.nid,
1525                              &export->exp_nid_hash);
1526         spin_unlock(&export->exp_lock);
1527
1528         /* class_cleanup(), abort_recovery(), and class_fail_export()
1529          * all end up in here, and if any of them race we shouldn't
1530          * call extra class_export_puts(). */
1531         if (already_disconnected) {
1532                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1533                 GOTO(no_disconn, already_disconnected);
1534         }
1535
1536         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1537                export->exp_handle.h_cookie);
1538
1539         class_export_recovery_cleanup(export);
1540         class_unlink_export(export);
1541 no_disconn:
1542         class_export_put(export);
1543         RETURN(0);
1544 }
1545 EXPORT_SYMBOL(class_disconnect);
1546
1547 /* Return non-zero for a fully connected export */
1548 int class_connected_export(struct obd_export *exp)
1549 {
1550         int connected = 0;
1551
1552         if (exp) {
1553                 spin_lock(&exp->exp_lock);
1554                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1555                 spin_unlock(&exp->exp_lock);
1556         }
1557         return connected;
1558 }
1559 EXPORT_SYMBOL(class_connected_export);
1560
1561 static void class_disconnect_export_list(struct list_head *list,
1562                                          enum obd_option flags)
1563 {
1564         int rc;
1565         struct obd_export *exp;
1566         ENTRY;
1567
1568         /* It's possible that an export may disconnect itself, but
1569          * nothing else will be added to this list. */
1570         while (!list_empty(list)) {
1571                 exp = list_entry(list->next, struct obd_export,
1572                                  exp_obd_chain);
1573                 /* need for safe call CDEBUG after obd_disconnect */
1574                 class_export_get(exp);
1575
1576                 spin_lock(&exp->exp_lock);
1577                 exp->exp_flags = flags;
1578                 spin_unlock(&exp->exp_lock);
1579
1580                 if (obd_uuid_equals(&exp->exp_client_uuid,
1581                                     &exp->exp_obd->obd_uuid)) {
1582                         CDEBUG(D_HA,
1583                                "exp %p export uuid == obd uuid, don't discon\n",
1584                                exp);
1585                         /* Need to delete this now so we don't end up pointing
1586                          * to work_list later when this export is cleaned up. */
1587                         list_del_init(&exp->exp_obd_chain);
1588                         class_export_put(exp);
1589                         continue;
1590                 }
1591
1592                 class_export_get(exp);
1593                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1594                        "last request at %lld\n",
1595                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1596                        exp, exp->exp_last_request_time);
1597                 /* release one export reference anyway */
1598                 rc = obd_disconnect(exp);
1599
1600                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1601                        obd_export_nid2str(exp), exp, rc);
1602                 class_export_put(exp);
1603         }
1604         EXIT;
1605 }
1606
1607 void class_disconnect_exports(struct obd_device *obd)
1608 {
1609         struct list_head work_list;
1610         ENTRY;
1611
1612         /* Move all of the exports from obd_exports to a work list, en masse. */
1613         INIT_LIST_HEAD(&work_list);
1614         spin_lock(&obd->obd_dev_lock);
1615         list_splice_init(&obd->obd_exports, &work_list);
1616         list_splice_init(&obd->obd_delayed_exports, &work_list);
1617         spin_unlock(&obd->obd_dev_lock);
1618
1619         if (!list_empty(&work_list)) {
1620                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1621                        "disconnecting them\n", obd->obd_minor, obd);
1622                 class_disconnect_export_list(&work_list,
1623                                              exp_flags_from_obd(obd));
1624         } else
1625                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1626                        obd->obd_minor, obd);
1627         EXIT;
1628 }
1629 EXPORT_SYMBOL(class_disconnect_exports);
1630
1631 /* Remove exports that have not completed recovery.
1632  */
1633 void class_disconnect_stale_exports(struct obd_device *obd,
1634                                     int (*test_export)(struct obd_export *))
1635 {
1636         struct list_head work_list;
1637         struct obd_export *exp, *n;
1638         int evicted = 0;
1639         ENTRY;
1640
1641         INIT_LIST_HEAD(&work_list);
1642         spin_lock(&obd->obd_dev_lock);
1643         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1644                                  exp_obd_chain) {
1645                 /* don't count self-export as client */
1646                 if (obd_uuid_equals(&exp->exp_client_uuid,
1647                                     &exp->exp_obd->obd_uuid))
1648                         continue;
1649
1650                 /* don't evict clients which have no slot in last_rcvd
1651                  * (e.g. lightweight connection) */
1652                 if (exp->exp_target_data.ted_lr_idx == -1)
1653                         continue;
1654
1655                 spin_lock(&exp->exp_lock);
1656                 if (exp->exp_failed || test_export(exp)) {
1657                         spin_unlock(&exp->exp_lock);
1658                         continue;
1659                 }
1660                 exp->exp_failed = 1;
1661                 spin_unlock(&exp->exp_lock);
1662
1663                 list_move(&exp->exp_obd_chain, &work_list);
1664                 evicted++;
1665                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1666                        obd->obd_name, exp->exp_client_uuid.uuid,
1667                        obd_export_nid2str(exp));
1668                 print_export_data(exp, "EVICTING", 0, D_HA);
1669         }
1670         spin_unlock(&obd->obd_dev_lock);
1671
1672         if (evicted)
1673                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1674                               obd->obd_name, evicted);
1675
1676         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1677                                                  OBD_OPT_ABORT_RECOV);
1678         EXIT;
1679 }
1680 EXPORT_SYMBOL(class_disconnect_stale_exports);
1681
1682 void class_fail_export(struct obd_export *exp)
1683 {
1684         int rc, already_failed;
1685
1686         spin_lock(&exp->exp_lock);
1687         already_failed = exp->exp_failed;
1688         exp->exp_failed = 1;
1689         spin_unlock(&exp->exp_lock);
1690
1691         if (already_failed) {
1692                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1693                        exp, exp->exp_client_uuid.uuid);
1694                 return;
1695         }
1696
1697         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1698                exp, exp->exp_client_uuid.uuid);
1699
1700         if (obd_dump_on_timeout)
1701                 libcfs_debug_dumplog();
1702
1703         /* need for safe call CDEBUG after obd_disconnect */
1704         class_export_get(exp);
1705
1706         /* Most callers into obd_disconnect are removing their own reference
1707          * (request, for example) in addition to the one from the hash table.
1708          * We don't have such a reference here, so make one. */
1709         class_export_get(exp);
1710         rc = obd_disconnect(exp);
1711         if (rc)
1712                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1713         else
1714                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1715                        exp, exp->exp_client_uuid.uuid);
1716         class_export_put(exp);
1717 }
1718 EXPORT_SYMBOL(class_fail_export);
1719
1720 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1721 {
1722         struct cfs_hash *nid_hash;
1723         struct obd_export *doomed_exp = NULL;
1724         int exports_evicted = 0;
1725
1726         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1727
1728         spin_lock(&obd->obd_dev_lock);
1729         /* umount has run already, so evict thread should leave
1730          * its task to umount thread now */
1731         if (obd->obd_stopping) {
1732                 spin_unlock(&obd->obd_dev_lock);
1733                 return exports_evicted;
1734         }
1735         nid_hash = obd->obd_nid_hash;
1736         cfs_hash_getref(nid_hash);
1737         spin_unlock(&obd->obd_dev_lock);
1738
1739         do {
1740                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1741                 if (doomed_exp == NULL)
1742                         break;
1743
1744                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1745                          "nid %s found, wanted nid %s, requested nid %s\n",
1746                          obd_export_nid2str(doomed_exp),
1747                          libcfs_nid2str(nid_key), nid);
1748                 LASSERTF(doomed_exp != obd->obd_self_export,
1749                          "self-export is hashed by NID?\n");
1750                 exports_evicted++;
1751                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1752                               "request\n", obd->obd_name,
1753                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1754                               obd_export_nid2str(doomed_exp));
1755                 class_fail_export(doomed_exp);
1756                 class_export_put(doomed_exp);
1757         } while (1);
1758
1759         cfs_hash_putref(nid_hash);
1760
1761         if (!exports_evicted)
1762                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1763                        obd->obd_name, nid);
1764         return exports_evicted;
1765 }
1766 EXPORT_SYMBOL(obd_export_evict_by_nid);
1767
1768 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1769 {
1770         struct cfs_hash *uuid_hash;
1771         struct obd_export *doomed_exp = NULL;
1772         struct obd_uuid doomed_uuid;
1773         int exports_evicted = 0;
1774
1775         spin_lock(&obd->obd_dev_lock);
1776         if (obd->obd_stopping) {
1777                 spin_unlock(&obd->obd_dev_lock);
1778                 return exports_evicted;
1779         }
1780         uuid_hash = obd->obd_uuid_hash;
1781         cfs_hash_getref(uuid_hash);
1782         spin_unlock(&obd->obd_dev_lock);
1783
1784         obd_str2uuid(&doomed_uuid, uuid);
1785         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1786                 CERROR("%s: can't evict myself\n", obd->obd_name);
1787                 cfs_hash_putref(uuid_hash);
1788                 return exports_evicted;
1789         }
1790
1791         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1792
1793         if (doomed_exp == NULL) {
1794                 CERROR("%s: can't disconnect %s: no exports found\n",
1795                        obd->obd_name, uuid);
1796         } else {
1797                 CWARN("%s: evicting %s at adminstrative request\n",
1798                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1799                 class_fail_export(doomed_exp);
1800                 class_export_put(doomed_exp);
1801                 exports_evicted++;
1802         }
1803         cfs_hash_putref(uuid_hash);
1804
1805         return exports_evicted;
1806 }
1807
1808 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1809 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1810 EXPORT_SYMBOL(class_export_dump_hook);
1811 #endif
1812
1813 static void print_export_data(struct obd_export *exp, const char *status,
1814                               int locks, int debug_level)
1815 {
1816         struct ptlrpc_reply_state *rs;
1817         struct ptlrpc_reply_state *first_reply = NULL;
1818         int nreplies = 0;
1819
1820         spin_lock(&exp->exp_lock);
1821         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1822                             rs_exp_list) {
1823                 if (nreplies == 0)
1824                         first_reply = rs;
1825                 nreplies++;
1826         }
1827         spin_unlock(&exp->exp_lock);
1828
1829         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1830                "%p %s %llu stale:%d\n",
1831                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1832                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1833                atomic_read(&exp->exp_rpc_count),
1834                atomic_read(&exp->exp_cb_count),
1835                atomic_read(&exp->exp_locks_count),
1836                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1837                nreplies, first_reply, nreplies > 3 ? "..." : "",
1838                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1839 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1840         if (locks && class_export_dump_hook != NULL)
1841                 class_export_dump_hook(exp);
1842 #endif
1843 }
1844
1845 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1846 {
1847         struct obd_export *exp;
1848
1849         spin_lock(&obd->obd_dev_lock);
1850         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1851                 print_export_data(exp, "ACTIVE", locks, debug_level);
1852         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1853                 print_export_data(exp, "UNLINKED", locks, debug_level);
1854         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1855                 print_export_data(exp, "DELAYED", locks, debug_level);
1856         spin_unlock(&obd->obd_dev_lock);
1857 }
1858
1859 void obd_exports_barrier(struct obd_device *obd)
1860 {
1861         int waited = 2;
1862         LASSERT(list_empty(&obd->obd_exports));
1863         spin_lock(&obd->obd_dev_lock);
1864         while (!list_empty(&obd->obd_unlinked_exports)) {
1865                 spin_unlock(&obd->obd_dev_lock);
1866                 set_current_state(TASK_UNINTERRUPTIBLE);
1867                 schedule_timeout(cfs_time_seconds(waited));
1868                 if (waited > 5 && is_power_of_2(waited)) {
1869                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1870                                       "more than %d seconds. "
1871                                       "The obd refcount = %d. Is it stuck?\n",
1872                                       obd->obd_name, waited,
1873                                       atomic_read(&obd->obd_refcount));
1874                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1875                 }
1876                 waited *= 2;
1877                 spin_lock(&obd->obd_dev_lock);
1878         }
1879         spin_unlock(&obd->obd_dev_lock);
1880 }
1881 EXPORT_SYMBOL(obd_exports_barrier);
1882
1883 /**
1884  * Add export to the obd_zombe thread and notify it.
1885  */
1886 static void obd_zombie_export_add(struct obd_export *exp) {
1887         atomic_dec(&obd_stale_export_num);
1888         spin_lock(&exp->exp_obd->obd_dev_lock);
1889         LASSERT(!list_empty(&exp->exp_obd_chain));
1890         list_del_init(&exp->exp_obd_chain);
1891         spin_unlock(&exp->exp_obd->obd_dev_lock);
1892
1893         queue_work(zombie_wq, &exp->exp_zombie_work);
1894 }
1895
1896 /**
1897  * Add import to the obd_zombe thread and notify it.
1898  */
1899 static void obd_zombie_import_add(struct obd_import *imp) {
1900         LASSERT(imp->imp_sec == NULL);
1901
1902         queue_work(zombie_wq, &imp->imp_zombie_work);
1903 }
1904
1905 /**
1906  * wait when obd_zombie import/export queues become empty
1907  */
1908 void obd_zombie_barrier(void)
1909 {
1910         flush_workqueue(zombie_wq);
1911 }
1912 EXPORT_SYMBOL(obd_zombie_barrier);
1913
1914
1915 struct obd_export *obd_stale_export_get(void)
1916 {
1917         struct obd_export *exp = NULL;
1918         ENTRY;
1919
1920         spin_lock(&obd_stale_export_lock);
1921         if (!list_empty(&obd_stale_exports)) {
1922                 exp = list_entry(obd_stale_exports.next,
1923                                  struct obd_export, exp_stale_list);
1924                 list_del_init(&exp->exp_stale_list);
1925         }
1926         spin_unlock(&obd_stale_export_lock);
1927
1928         if (exp) {
1929                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1930                        atomic_read(&obd_stale_export_num));
1931         }
1932         RETURN(exp);
1933 }
1934 EXPORT_SYMBOL(obd_stale_export_get);
1935
1936 void obd_stale_export_put(struct obd_export *exp)
1937 {
1938         ENTRY;
1939
1940         LASSERT(list_empty(&exp->exp_stale_list));
1941         if (exp->exp_lock_hash &&
1942             atomic_read(&exp->exp_lock_hash->hs_count)) {
1943                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1944                        atomic_read(&obd_stale_export_num));
1945
1946                 spin_lock_bh(&exp->exp_bl_list_lock);
1947                 spin_lock(&obd_stale_export_lock);
1948                 /* Add to the tail if there is no blocked locks,
1949                  * to the head otherwise. */
1950                 if (list_empty(&exp->exp_bl_list))
1951                         list_add_tail(&exp->exp_stale_list,
1952                                       &obd_stale_exports);
1953                 else
1954                         list_add(&exp->exp_stale_list,
1955                                  &obd_stale_exports);
1956
1957                 spin_unlock(&obd_stale_export_lock);
1958                 spin_unlock_bh(&exp->exp_bl_list_lock);
1959         } else {
1960                 class_export_put(exp);
1961         }
1962         EXIT;
1963 }
1964 EXPORT_SYMBOL(obd_stale_export_put);
1965
1966 /**
1967  * Adjust the position of the export in the stale list,
1968  * i.e. move to the head of the list if is needed.
1969  **/
1970 void obd_stale_export_adjust(struct obd_export *exp)
1971 {
1972         LASSERT(exp != NULL);
1973         spin_lock_bh(&exp->exp_bl_list_lock);
1974         spin_lock(&obd_stale_export_lock);
1975
1976         if (!list_empty(&exp->exp_stale_list) &&
1977             !list_empty(&exp->exp_bl_list))
1978                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1979
1980         spin_unlock(&obd_stale_export_lock);
1981         spin_unlock_bh(&exp->exp_bl_list_lock);
1982 }
1983 EXPORT_SYMBOL(obd_stale_export_adjust);
1984
1985 /**
1986  * start destroy zombie import/export thread
1987  */
1988 int obd_zombie_impexp_init(void)
1989 {
1990         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1991         if (!zombie_wq)
1992                 return -ENOMEM;
1993
1994         return 0;
1995 }
1996
1997 /**
1998  * stop destroy zombie import/export thread
1999  */
2000 void obd_zombie_impexp_stop(void)
2001 {
2002         destroy_workqueue(zombie_wq);
2003         LASSERT(list_empty(&obd_stale_exports));
2004 }
2005
2006 /***** Kernel-userspace comm helpers *******/
2007
2008 /* Get length of entire message, including header */
2009 int kuc_len(int payload_len)
2010 {
2011         return sizeof(struct kuc_hdr) + payload_len;
2012 }
2013 EXPORT_SYMBOL(kuc_len);
2014
2015 /* Get a pointer to kuc header, given a ptr to the payload
2016  * @param p Pointer to payload area
2017  * @returns Pointer to kuc header
2018  */
2019 struct kuc_hdr * kuc_ptr(void *p)
2020 {
2021         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2022         LASSERT(lh->kuc_magic == KUC_MAGIC);
2023         return lh;
2024 }
2025 EXPORT_SYMBOL(kuc_ptr);
2026
2027 /* Alloc space for a message, and fill in header
2028  * @return Pointer to payload area
2029  */
2030 void *kuc_alloc(int payload_len, int transport, int type)
2031 {
2032         struct kuc_hdr *lh;
2033         int len = kuc_len(payload_len);
2034
2035         OBD_ALLOC(lh, len);
2036         if (lh == NULL)
2037                 return ERR_PTR(-ENOMEM);
2038
2039         lh->kuc_magic = KUC_MAGIC;
2040         lh->kuc_transport = transport;
2041         lh->kuc_msgtype = type;
2042         lh->kuc_msglen = len;
2043
2044         return (void *)(lh + 1);
2045 }
2046 EXPORT_SYMBOL(kuc_alloc);
2047
2048 /* Takes pointer to payload area */
2049 void kuc_free(void *p, int payload_len)
2050 {
2051         struct kuc_hdr *lh = kuc_ptr(p);
2052         OBD_FREE(lh, kuc_len(payload_len));
2053 }
2054 EXPORT_SYMBOL(kuc_free);
2055
2056 struct obd_request_slot_waiter {
2057         struct list_head        orsw_entry;
2058         wait_queue_head_t       orsw_waitq;
2059         bool                    orsw_signaled;
2060 };
2061
2062 static bool obd_request_slot_avail(struct client_obd *cli,
2063                                    struct obd_request_slot_waiter *orsw)
2064 {
2065         bool avail;
2066
2067         spin_lock(&cli->cl_loi_list_lock);
2068         avail = !!list_empty(&orsw->orsw_entry);
2069         spin_unlock(&cli->cl_loi_list_lock);
2070
2071         return avail;
2072 };
2073
2074 /*
2075  * For network flow control, the RPC sponsor needs to acquire a credit
2076  * before sending the RPC. The credits count for a connection is defined
2077  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2078  * the subsequent RPC sponsors need to wait until others released their
2079  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2080  */
2081 int obd_get_request_slot(struct client_obd *cli)
2082 {
2083         struct obd_request_slot_waiter   orsw;
2084         struct l_wait_info               lwi;
2085         int                              rc;
2086
2087         spin_lock(&cli->cl_loi_list_lock);
2088         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2089                 cli->cl_rpcs_in_flight++;
2090                 spin_unlock(&cli->cl_loi_list_lock);
2091                 return 0;
2092         }
2093
2094         init_waitqueue_head(&orsw.orsw_waitq);
2095         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2096         orsw.orsw_signaled = false;
2097         spin_unlock(&cli->cl_loi_list_lock);
2098
2099         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2100         rc = l_wait_event(orsw.orsw_waitq,
2101                           obd_request_slot_avail(cli, &orsw) ||
2102                           orsw.orsw_signaled,
2103                           &lwi);
2104
2105         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2106          * freed but other (such as obd_put_request_slot) is using it. */
2107         spin_lock(&cli->cl_loi_list_lock);
2108         if (rc != 0) {
2109                 if (!orsw.orsw_signaled) {
2110                         if (list_empty(&orsw.orsw_entry))
2111                                 cli->cl_rpcs_in_flight--;
2112                         else
2113                                 list_del(&orsw.orsw_entry);
2114                 }
2115         }
2116
2117         if (orsw.orsw_signaled) {
2118                 LASSERT(list_empty(&orsw.orsw_entry));
2119
2120                 rc = -EINTR;
2121         }
2122         spin_unlock(&cli->cl_loi_list_lock);
2123
2124         return rc;
2125 }
2126 EXPORT_SYMBOL(obd_get_request_slot);
2127
2128 void obd_put_request_slot(struct client_obd *cli)
2129 {
2130         struct obd_request_slot_waiter *orsw;
2131
2132         spin_lock(&cli->cl_loi_list_lock);
2133         cli->cl_rpcs_in_flight--;
2134
2135         /* If there is free slot, wakeup the first waiter. */
2136         if (!list_empty(&cli->cl_flight_waiters) &&
2137             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2138                 orsw = list_entry(cli->cl_flight_waiters.next,
2139                                   struct obd_request_slot_waiter, orsw_entry);
2140                 list_del_init(&orsw->orsw_entry);
2141                 cli->cl_rpcs_in_flight++;
2142                 wake_up(&orsw->orsw_waitq);
2143         }
2144         spin_unlock(&cli->cl_loi_list_lock);
2145 }
2146 EXPORT_SYMBOL(obd_put_request_slot);
2147
2148 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2149 {
2150         return cli->cl_max_rpcs_in_flight;
2151 }
2152 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2153
2154 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2155 {
2156         struct obd_request_slot_waiter *orsw;
2157         __u32                           old;
2158         int                             diff;
2159         int                             i;
2160         char                            *typ_name;
2161         int                             rc;
2162
2163         if (max > OBD_MAX_RIF_MAX || max < 1)
2164                 return -ERANGE;
2165
2166         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2167         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2168                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2169                  * strictly lower that max_rpcs_in_flight */
2170                 if (max < 2) {
2171                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2172                                "because it must be higher than "
2173                                "max_mod_rpcs_in_flight value",
2174                                cli->cl_import->imp_obd->obd_name);
2175                         return -ERANGE;
2176                 }
2177                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2178                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2179                         if (rc != 0)
2180                                 return rc;
2181                 }
2182         }
2183
2184         spin_lock(&cli->cl_loi_list_lock);
2185         old = cli->cl_max_rpcs_in_flight;
2186         cli->cl_max_rpcs_in_flight = max;
2187         client_adjust_max_dirty(cli);
2188
2189         diff = max - old;
2190
2191         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2192         for (i = 0; i < diff; i++) {
2193                 if (list_empty(&cli->cl_flight_waiters))
2194                         break;
2195
2196                 orsw = list_entry(cli->cl_flight_waiters.next,
2197                                   struct obd_request_slot_waiter, orsw_entry);
2198                 list_del_init(&orsw->orsw_entry);
2199                 cli->cl_rpcs_in_flight++;
2200                 wake_up(&orsw->orsw_waitq);
2201         }
2202         spin_unlock(&cli->cl_loi_list_lock);
2203
2204         return 0;
2205 }
2206 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2207
2208 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2209 {
2210         return cli->cl_max_mod_rpcs_in_flight;
2211 }
2212 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2213
2214 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2215 {
2216         struct obd_connect_data *ocd;
2217         __u16 maxmodrpcs;
2218         __u16 prev;
2219
2220         if (max > OBD_MAX_RIF_MAX || max < 1)
2221                 return -ERANGE;
2222
2223         /* cannot exceed or equal max_rpcs_in_flight */
2224         if (max >= cli->cl_max_rpcs_in_flight) {
2225                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2226                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2227                        cli->cl_import->imp_obd->obd_name,
2228                        max, cli->cl_max_rpcs_in_flight);
2229                 return -ERANGE;
2230         }
2231
2232         /* cannot exceed max modify RPCs in flight supported by the server */
2233         ocd = &cli->cl_import->imp_connect_data;
2234         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2235                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2236         else
2237                 maxmodrpcs = 1;
2238         if (max > maxmodrpcs) {
2239                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2240                        "higher than max_mod_rpcs_per_client value (%hu) "
2241                        "returned by the server at connection\n",
2242                        cli->cl_import->imp_obd->obd_name,
2243                        max, maxmodrpcs);
2244                 return -ERANGE;
2245         }
2246
2247         spin_lock(&cli->cl_mod_rpcs_lock);
2248
2249         prev = cli->cl_max_mod_rpcs_in_flight;
2250         cli->cl_max_mod_rpcs_in_flight = max;
2251
2252         /* wakeup waiters if limit has been increased */
2253         if (cli->cl_max_mod_rpcs_in_flight > prev)
2254                 wake_up(&cli->cl_mod_rpcs_waitq);
2255
2256         spin_unlock(&cli->cl_mod_rpcs_lock);
2257
2258         return 0;
2259 }
2260 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2261
2262
2263 #define pct(a, b) (b ? a * 100 / b : 0)
2264 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2265                                struct seq_file *seq)
2266 {
2267         unsigned long mod_tot = 0, mod_cum;
2268         struct timespec64 now;
2269         int i;
2270
2271         ktime_get_real_ts64(&now);
2272
2273         spin_lock(&cli->cl_mod_rpcs_lock);
2274
2275         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2276                    (s64)now.tv_sec, now.tv_nsec);
2277         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2278                    cli->cl_mod_rpcs_in_flight);
2279
2280         seq_printf(seq, "\n\t\t\tmodify\n");
2281         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2282
2283         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2284
2285         mod_cum = 0;
2286         for (i = 0; i < OBD_HIST_MAX; i++) {
2287                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2288                 mod_cum += mod;
2289                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2290                            i, mod, pct(mod, mod_tot),
2291                            pct(mod_cum, mod_tot));
2292                 if (mod_cum == mod_tot)
2293                         break;
2294         }
2295
2296         spin_unlock(&cli->cl_mod_rpcs_lock);
2297
2298         return 0;
2299 }
2300 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2301 #undef pct
2302
2303
2304 /* The number of modify RPCs sent in parallel is limited
2305  * because the server has a finite number of slots per client to
2306  * store request result and ensure reply reconstruction when needed.
2307  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2308  * that takes into account server limit and cl_max_rpcs_in_flight
2309  * value.
2310  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2311  * one close request is allowed above the maximum.
2312  */
2313 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2314                                                  bool close_req)
2315 {
2316         bool avail;
2317
2318         /* A slot is available if
2319          * - number of modify RPCs in flight is less than the max
2320          * - it's a close RPC and no other close request is in flight
2321          */
2322         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2323                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2324
2325         return avail;
2326 }
2327
2328 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2329                                          bool close_req)
2330 {
2331         bool avail;
2332
2333         spin_lock(&cli->cl_mod_rpcs_lock);
2334         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2335         spin_unlock(&cli->cl_mod_rpcs_lock);
2336         return avail;
2337 }
2338
2339 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2340 {
2341         if (it != NULL &&
2342             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2343              it->it_op == IT_READDIR ||
2344              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2345                         return true;
2346         return false;
2347 }
2348
2349 /* Get a modify RPC slot from the obd client @cli according
2350  * to the kind of operation @opc that is going to be sent
2351  * and the intent @it of the operation if it applies.
2352  * If the maximum number of modify RPCs in flight is reached
2353  * the thread is put to sleep.
2354  * Returns the tag to be set in the request message. Tag 0
2355  * is reserved for non-modifying requests.
2356  */
2357 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2358                            struct lookup_intent *it)
2359 {
2360         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2361         bool                    close_req = false;
2362         __u16                   i, max;
2363
2364         /* read-only metadata RPCs don't consume a slot on MDT
2365          * for reply reconstruction
2366          */
2367         if (obd_skip_mod_rpc_slot(it))
2368                 return 0;
2369
2370         if (opc == MDS_CLOSE)
2371                 close_req = true;
2372
2373         do {
2374                 spin_lock(&cli->cl_mod_rpcs_lock);
2375                 max = cli->cl_max_mod_rpcs_in_flight;
2376                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2377                         /* there is a slot available */
2378                         cli->cl_mod_rpcs_in_flight++;
2379                         if (close_req)
2380                                 cli->cl_close_rpcs_in_flight++;
2381                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2382                                          cli->cl_mod_rpcs_in_flight);
2383                         /* find a free tag */
2384                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2385                                                 max + 1);
2386                         LASSERT(i < OBD_MAX_RIF_MAX);
2387                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2388                         spin_unlock(&cli->cl_mod_rpcs_lock);
2389                         /* tag 0 is reserved for non-modify RPCs */
2390                         return i + 1;
2391                 }
2392                 spin_unlock(&cli->cl_mod_rpcs_lock);
2393
2394                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2395                        "opc %u, max %hu\n",
2396                        cli->cl_import->imp_obd->obd_name, opc, max);
2397
2398                 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2399                                        obd_mod_rpc_slot_avail(cli, close_req),
2400                                        &lwi);
2401         } while (true);
2402 }
2403 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2404
2405 /* Put a modify RPC slot from the obd client @cli according
2406  * to the kind of operation @opc that has been sent and the
2407  * intent @it of the operation if it applies.
2408  */
2409 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2410                           struct lookup_intent *it, __u16 tag)
2411 {
2412         bool                    close_req = false;
2413
2414         if (obd_skip_mod_rpc_slot(it))
2415                 return;
2416
2417         if (opc == MDS_CLOSE)
2418                 close_req = true;
2419
2420         spin_lock(&cli->cl_mod_rpcs_lock);
2421         cli->cl_mod_rpcs_in_flight--;
2422         if (close_req)
2423                 cli->cl_close_rpcs_in_flight--;
2424         /* release the tag in the bitmap */
2425         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2426         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2427         spin_unlock(&cli->cl_mod_rpcs_lock);
2428         wake_up(&cli->cl_mod_rpcs_waitq);
2429 }
2430 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2431