Whamcloud - gitweb
9ea1d91b4b5104e6f37c94c40a6cd5a12417fd70
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
47
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
57
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks, int debug_level);
67
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74
75 /*
76  * support functions: we could use inter-module communication, but this
77  * is more portable to other OS's
78  */
79 static struct obd_device *obd_device_alloc(void)
80 {
81         struct obd_device *obd;
82
83         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84         if (obd != NULL) {
85                 obd->obd_magic = OBD_DEVICE_MAGIC;
86         }
87         return obd;
88 }
89
90 static void obd_device_free(struct obd_device *obd)
91 {
92         LASSERT(obd != NULL);
93         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95         if (obd->obd_namespace != NULL) {
96                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97                        obd, obd->obd_namespace, obd->obd_force);
98                 LBUG();
99         }
100         lu_ref_fini(&obd->obd_reference);
101         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 }
103
104 struct obd_type *class_search_type(const char *name)
105 {
106         struct list_head *tmp;
107         struct obd_type *type;
108
109         spin_lock(&obd_types_lock);
110         list_for_each(tmp, &obd_types) {
111                 type = list_entry(tmp, struct obd_type, typ_chain);
112                 if (strcmp(type->typ_name, name) == 0) {
113                         spin_unlock(&obd_types_lock);
114                         return type;
115                 }
116         }
117         spin_unlock(&obd_types_lock);
118         return NULL;
119 }
120 EXPORT_SYMBOL(class_search_type);
121
122 struct obd_type *class_get_type(const char *name)
123 {
124         struct obd_type *type = class_search_type(name);
125
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
127         if (!type) {
128                 const char *modname = name;
129
130                 if (strcmp(modname, "obdfilter") == 0)
131                         modname = "ofd";
132
133                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134                         modname = LUSTRE_OSP_NAME;
135
136                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137                         modname = LUSTRE_MDT_NAME;
138
139                 if (!request_module("%s", modname)) {
140                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141                         type = class_search_type(name);
142                 } else {
143                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144                                            modname);
145                 }
146         }
147 #endif
148         if (type) {
149                 spin_lock(&type->obd_type_lock);
150                 type->typ_refcnt++;
151                 try_module_get(type->typ_dt_ops->o_owner);
152                 spin_unlock(&type->obd_type_lock);
153         }
154         return type;
155 }
156
157 void class_put_type(struct obd_type *type)
158 {
159         LASSERT(type);
160         spin_lock(&type->obd_type_lock);
161         type->typ_refcnt--;
162         module_put(type->typ_dt_ops->o_owner);
163         spin_unlock(&type->obd_type_lock);
164 }
165
166 static void class_sysfs_release(struct kobject *kobj)
167 {
168         OBD_FREE(kobj, sizeof(*kobj));
169 }
170
171 static struct kobj_type class_ktype = {
172         .sysfs_ops      = &lustre_sysfs_ops,
173         .release        = class_sysfs_release,
174 };
175
176 struct kobject *class_setup_tunables(const char *name)
177 {
178         struct kobject *kobj;
179         int rc;
180
181 #ifdef HAVE_SERVER_SUPPORT
182         kobj = kset_find_obj(lustre_kset, name);
183         if (kobj)
184                 return kobj;
185 #endif
186         OBD_ALLOC(kobj, sizeof(*kobj));
187         if (!kobj)
188                 return ERR_PTR(-ENOMEM);
189
190         kobj->kset = lustre_kset;
191         kobject_init(kobj, &class_ktype);
192         rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
193         if (rc) {
194                 kobject_put(kobj);
195                 return ERR_PTR(rc);
196         }
197         return kobj;
198 }
199 EXPORT_SYMBOL(class_setup_tunables);
200
201 #define CLASS_MAX_NAME 1024
202
203 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
204                         bool enable_proc, struct lprocfs_vars *vars,
205                         const char *name, struct lu_device_type *ldt)
206 {
207         struct obd_type *type;
208 #ifdef HAVE_SERVER_SUPPORT
209         struct qstr dname;
210 #endif /* HAVE_SERVER_SUPPORT */
211         int rc = 0;
212
213         ENTRY;
214         /* sanity check */
215         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
216
217         if (class_search_type(name)) {
218                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
219                 RETURN(-EEXIST);
220         }
221
222         rc = -ENOMEM;
223         OBD_ALLOC(type, sizeof(*type));
224         if (type == NULL)
225                 RETURN(rc);
226
227         OBD_ALLOC_PTR(type->typ_dt_ops);
228         OBD_ALLOC_PTR(type->typ_md_ops);
229         OBD_ALLOC(type->typ_name, strlen(name) + 1);
230
231         if (type->typ_dt_ops == NULL ||
232             type->typ_md_ops == NULL ||
233             type->typ_name == NULL)
234                 GOTO (failed, rc);
235
236         *(type->typ_dt_ops) = *dt_ops;
237         /* md_ops is optional */
238         if (md_ops)
239                 *(type->typ_md_ops) = *md_ops;
240         strcpy(type->typ_name, name);
241         spin_lock_init(&type->obd_type_lock);
242
243 #ifdef CONFIG_PROC_FS
244         if (enable_proc) {
245                 type->typ_procroot = lprocfs_register(type->typ_name,
246                                                       proc_lustre_root,
247                                                       vars, type);
248                 if (IS_ERR(type->typ_procroot)) {
249                         rc = PTR_ERR(type->typ_procroot);
250                         type->typ_procroot = NULL;
251                         GOTO(failed, rc);
252                 }
253         }
254 #endif
255 #ifdef HAVE_SERVER_SUPPORT
256         dname.name = name;
257         dname.len = strlen(dname.name);
258         dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
259                                        dname.len);
260         type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
261         if (type->typ_debugfs_entry) {
262                 dput(type->typ_debugfs_entry);
263                 type->typ_sym_filter = true;
264                 goto dir_exist;
265         }
266 #endif /* HAVE_SERVER_SUPPORT */
267
268         type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
269                                                     debugfs_lustre_root,
270                                                     NULL, type);
271         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
272                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
273                                              : -ENOMEM;
274                 type->typ_debugfs_entry = NULL;
275                 GOTO(failed, rc);
276         }
277 #ifdef HAVE_SERVER_SUPPORT
278 dir_exist:
279 #endif
280         type->typ_kobj = class_setup_tunables(type->typ_name);
281         if (IS_ERR(type->typ_kobj))
282                 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
283
284         if (ldt) {
285                 type->typ_lu = ldt;
286                 rc = lu_device_type_init(ldt);
287                 if (rc) {
288                         kobject_put(type->typ_kobj);
289                         GOTO(failed, rc);
290                 }
291         }
292
293         spin_lock(&obd_types_lock);
294         list_add(&type->typ_chain, &obd_types);
295         spin_unlock(&obd_types_lock);
296
297         RETURN(0);
298
299 failed:
300 #ifdef HAVE_SERVER_SUPPORT
301         if (type->typ_sym_filter)
302                 type->typ_debugfs_entry = NULL;
303 #endif
304         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
305                 ldebugfs_remove(&type->typ_debugfs_entry);
306         if (type->typ_name != NULL) {
307 #ifdef CONFIG_PROC_FS
308                 if (type->typ_procroot != NULL)
309                         remove_proc_subtree(type->typ_name, proc_lustre_root);
310 #endif
311                 OBD_FREE(type->typ_name, strlen(name) + 1);
312         }
313         if (type->typ_md_ops != NULL)
314                 OBD_FREE_PTR(type->typ_md_ops);
315         if (type->typ_dt_ops != NULL)
316                 OBD_FREE_PTR(type->typ_dt_ops);
317         OBD_FREE(type, sizeof(*type));
318         RETURN(rc);
319 }
320 EXPORT_SYMBOL(class_register_type);
321
322 int class_unregister_type(const char *name)
323 {
324         struct obd_type *type = class_search_type(name);
325         ENTRY;
326
327         if (!type) {
328                 CERROR("unknown obd type\n");
329                 RETURN(-EINVAL);
330         }
331
332         if (type->typ_refcnt) {
333                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
334                 /* This is a bad situation, let's make the best of it */
335                 /* Remove ops, but leave the name for debugging */
336                 OBD_FREE_PTR(type->typ_dt_ops);
337                 OBD_FREE_PTR(type->typ_md_ops);
338                 RETURN(-EBUSY);
339         }
340
341         kobject_put(type->typ_kobj);
342
343         /* we do not use type->typ_procroot as for compatibility purposes
344          * other modules can share names (i.e. lod can use lov entry). so
345          * we can't reference pointer as it can get invalided when another
346          * module removes the entry */
347 #ifdef CONFIG_PROC_FS
348         if (type->typ_procroot != NULL)
349                 remove_proc_subtree(type->typ_name, proc_lustre_root);
350         if (type->typ_procsym != NULL)
351                 lprocfs_remove(&type->typ_procsym);
352 #endif
353 #ifdef HAVE_SERVER_SUPPORT
354         if (type->typ_sym_filter)
355                 type->typ_debugfs_entry = NULL;
356 #endif
357         if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
358                 ldebugfs_remove(&type->typ_debugfs_entry);
359
360         if (type->typ_lu)
361                 lu_device_type_fini(type->typ_lu);
362
363         spin_lock(&obd_types_lock);
364         list_del(&type->typ_chain);
365         spin_unlock(&obd_types_lock);
366         OBD_FREE(type->typ_name, strlen(name) + 1);
367         if (type->typ_dt_ops != NULL)
368                 OBD_FREE_PTR(type->typ_dt_ops);
369         if (type->typ_md_ops != NULL)
370                 OBD_FREE_PTR(type->typ_md_ops);
371         OBD_FREE(type, sizeof(*type));
372         RETURN(0);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
375
376 /**
377  * Create a new obd device.
378  *
379  * Allocate the new obd_device and initialize it.
380  *
381  * \param[in] type_name obd device type string.
382  * \param[in] name      obd device name.
383  * \param[in] uuid      obd device UUID
384  *
385  * \retval newdev         pointer to created obd_device
386  * \retval ERR_PTR(errno) on error
387  */
388 struct obd_device *class_newdev(const char *type_name, const char *name,
389                                 const char *uuid)
390 {
391         struct obd_device *newdev;
392         struct obd_type *type = NULL;
393         ENTRY;
394
395         if (strlen(name) >= MAX_OBD_NAME) {
396                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397                 RETURN(ERR_PTR(-EINVAL));
398         }
399
400         type = class_get_type(type_name);
401         if (type == NULL){
402                 CERROR("OBD: unknown type: %s\n", type_name);
403                 RETURN(ERR_PTR(-ENODEV));
404         }
405
406         newdev = obd_device_alloc();
407         if (newdev == NULL) {
408                 class_put_type(type);
409                 RETURN(ERR_PTR(-ENOMEM));
410         }
411         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413         newdev->obd_type = type;
414         newdev->obd_minor = -1;
415
416         rwlock_init(&newdev->obd_pool_lock);
417         newdev->obd_pool_limit = 0;
418         newdev->obd_pool_slv = 0;
419
420         INIT_LIST_HEAD(&newdev->obd_exports);
421         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423         INIT_LIST_HEAD(&newdev->obd_exports_timed);
424         INIT_LIST_HEAD(&newdev->obd_nid_stats);
425         spin_lock_init(&newdev->obd_nid_lock);
426         spin_lock_init(&newdev->obd_dev_lock);
427         mutex_init(&newdev->obd_dev_mutex);
428         spin_lock_init(&newdev->obd_osfs_lock);
429         /* newdev->obd_osfs_age must be set to a value in the distant
430          * past to guarantee a fresh statfs is fetched on mount. */
431         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
432
433         /* XXX belongs in setup not attach  */
434         init_rwsem(&newdev->obd_observer_link_sem);
435         /* recovery data */
436         init_timer(&newdev->obd_recovery_timer);
437         spin_lock_init(&newdev->obd_recovery_task_lock);
438         init_waitqueue_head(&newdev->obd_next_transno_waitq);
439         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
440         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
441         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
442         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
443         INIT_LIST_HEAD(&newdev->obd_evict_list);
444         INIT_LIST_HEAD(&newdev->obd_lwp_list);
445
446         llog_group_init(&newdev->obd_olg);
447         /* Detach drops this */
448         atomic_set(&newdev->obd_refcount, 1);
449         lu_ref_init(&newdev->obd_reference);
450         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
451
452         newdev->obd_conn_inprogress = 0;
453
454         strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
455
456         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
457                newdev->obd_name, newdev);
458
459         return newdev;
460 }
461
462 /**
463  * Free obd device.
464  *
465  * \param[in] obd obd_device to be freed
466  *
467  * \retval none
468  */
469 void class_free_dev(struct obd_device *obd)
470 {
471         struct obd_type *obd_type = obd->obd_type;
472
473         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
474                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
476                  "obd %p != obd_devs[%d] %p\n",
477                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
478         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
479                  "obd_refcount should be 0, not %d\n",
480                  atomic_read(&obd->obd_refcount));
481         LASSERT(obd_type != NULL);
482
483         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
484                obd->obd_name, obd->obd_type->typ_name);
485
486         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
487                          obd->obd_name, obd->obd_uuid.uuid);
488         if (obd->obd_stopping) {
489                 int err;
490
491                 /* If we're not stopping, we were never set up */
492                 err = obd_cleanup(obd);
493                 if (err)
494                         CERROR("Cleanup %s returned %d\n",
495                                 obd->obd_name, err);
496         }
497
498         obd_device_free(obd);
499
500         class_put_type(obd_type);
501 }
502
503 /**
504  * Unregister obd device.
505  *
506  * Free slot in obd_dev[] used by \a obd.
507  *
508  * \param[in] new_obd obd_device to be unregistered
509  *
510  * \retval none
511  */
512 void class_unregister_device(struct obd_device *obd)
513 {
514         write_lock(&obd_dev_lock);
515         if (obd->obd_minor >= 0) {
516                 LASSERT(obd_devs[obd->obd_minor] == obd);
517                 obd_devs[obd->obd_minor] = NULL;
518                 obd->obd_minor = -1;
519         }
520         write_unlock(&obd_dev_lock);
521 }
522
523 /**
524  * Register obd device.
525  *
526  * Find free slot in obd_devs[], fills it with \a new_obd.
527  *
528  * \param[in] new_obd obd_device to be registered
529  *
530  * \retval 0          success
531  * \retval -EEXIST    device with this name is registered
532  * \retval -EOVERFLOW obd_devs[] is full
533  */
534 int class_register_device(struct obd_device *new_obd)
535 {
536         int ret = 0;
537         int i;
538         int new_obd_minor = 0;
539         bool minor_assign = false;
540         bool retried = false;
541
542 again:
543         write_lock(&obd_dev_lock);
544         for (i = 0; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd != NULL &&
548                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
549
550                         if (!retried) {
551                                 write_unlock(&obd_dev_lock);
552
553                                 /* the obd_device could be waited to be
554                                  * destroyed by the "obd_zombie_impexp_thread".
555                                  */
556                                 obd_zombie_barrier();
557                                 retried = true;
558                                 goto again;
559                         }
560
561                         CERROR("%s: already exists, won't add\n",
562                                obd->obd_name);
563                         /* in case we found a free slot before duplicate */
564                         minor_assign = false;
565                         ret = -EEXIST;
566                         break;
567                 }
568                 if (!minor_assign && obd == NULL) {
569                         new_obd_minor = i;
570                         minor_assign = true;
571                 }
572         }
573
574         if (minor_assign) {
575                 new_obd->obd_minor = new_obd_minor;
576                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
577                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
578                 obd_devs[new_obd_minor] = new_obd;
579         } else {
580                 if (ret == 0) {
581                         ret = -EOVERFLOW;
582                         CERROR("%s: all %u/%u devices used, increase "
583                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
584                                i, class_devno_max(), ret);
585                 }
586         }
587         write_unlock(&obd_dev_lock);
588
589         RETURN(ret);
590 }
591
592 static int class_name2dev_nolock(const char *name)
593 {
594         int i;
595
596         if (!name)
597                 return -1;
598
599         for (i = 0; i < class_devno_max(); i++) {
600                 struct obd_device *obd = class_num2obd(i);
601
602                 if (obd && strcmp(name, obd->obd_name) == 0) {
603                         /* Make sure we finished attaching before we give
604                            out any references */
605                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
606                         if (obd->obd_attached) {
607                                 return i;
608                         }
609                         break;
610                 }
611         }
612
613         return -1;
614 }
615
616 int class_name2dev(const char *name)
617 {
618         int i;
619
620         if (!name)
621                 return -1;
622
623         read_lock(&obd_dev_lock);
624         i = class_name2dev_nolock(name);
625         read_unlock(&obd_dev_lock);
626
627         return i;
628 }
629 EXPORT_SYMBOL(class_name2dev);
630
631 struct obd_device *class_name2obd(const char *name)
632 {
633         int dev = class_name2dev(name);
634
635         if (dev < 0 || dev > class_devno_max())
636                 return NULL;
637         return class_num2obd(dev);
638 }
639 EXPORT_SYMBOL(class_name2obd);
640
641 int class_uuid2dev_nolock(struct obd_uuid *uuid)
642 {
643         int i;
644
645         for (i = 0; i < class_devno_max(); i++) {
646                 struct obd_device *obd = class_num2obd(i);
647
648                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
649                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
650                         return i;
651                 }
652         }
653
654         return -1;
655 }
656
657 int class_uuid2dev(struct obd_uuid *uuid)
658 {
659         int i;
660
661         read_lock(&obd_dev_lock);
662         i = class_uuid2dev_nolock(uuid);
663         read_unlock(&obd_dev_lock);
664
665         return i;
666 }
667 EXPORT_SYMBOL(class_uuid2dev);
668
669 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
670 {
671         int dev = class_uuid2dev(uuid);
672         if (dev < 0)
673                 return NULL;
674         return class_num2obd(dev);
675 }
676 EXPORT_SYMBOL(class_uuid2obd);
677
678 /**
679  * Get obd device from ::obd_devs[]
680  *
681  * \param num [in] array index
682  *
683  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
684  *         otherwise return the obd device there.
685  */
686 struct obd_device *class_num2obd(int num)
687 {
688         struct obd_device *obd = NULL;
689
690         if (num < class_devno_max()) {
691                 obd = obd_devs[num];
692                 if (obd == NULL)
693                         return NULL;
694
695                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
696                          "%p obd_magic %08x != %08x\n",
697                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
698                 LASSERTF(obd->obd_minor == num,
699                          "%p obd_minor %0d != %0d\n",
700                          obd, obd->obd_minor, num);
701         }
702
703         return obd;
704 }
705
706 /**
707  * Find obd in obd_dev[] by name or uuid.
708  *
709  * Increment obd's refcount if found.
710  *
711  * \param[in] str obd name or uuid
712  *
713  * \retval NULL    if not found
714  * \retval target  pointer to found obd_device
715  */
716 struct obd_device *class_dev_by_str(const char *str)
717 {
718         struct obd_device *target = NULL;
719         struct obd_uuid tgtuuid;
720         int rc;
721
722         obd_str2uuid(&tgtuuid, str);
723
724         read_lock(&obd_dev_lock);
725         rc = class_uuid2dev_nolock(&tgtuuid);
726         if (rc < 0)
727                 rc = class_name2dev_nolock(str);
728
729         if (rc >= 0)
730                 target = class_num2obd(rc);
731
732         if (target != NULL)
733                 class_incref(target, "find", current);
734         read_unlock(&obd_dev_lock);
735
736         RETURN(target);
737 }
738 EXPORT_SYMBOL(class_dev_by_str);
739
740 /**
741  * Get obd devices count. Device in any
742  *    state are counted
743  * \retval obd device count
744  */
745 int get_devices_count(void)
746 {
747         int index, max_index = class_devno_max(), dev_count = 0;
748
749         read_lock(&obd_dev_lock);
750         for (index = 0; index <= max_index; index++) {
751                 struct obd_device *obd = class_num2obd(index);
752                 if (obd != NULL)
753                         dev_count++;
754         }
755         read_unlock(&obd_dev_lock);
756
757         return dev_count;
758 }
759 EXPORT_SYMBOL(get_devices_count);
760
761 void class_obd_list(void)
762 {
763         char *status;
764         int i;
765
766         read_lock(&obd_dev_lock);
767         for (i = 0; i < class_devno_max(); i++) {
768                 struct obd_device *obd = class_num2obd(i);
769
770                 if (obd == NULL)
771                         continue;
772                 if (obd->obd_stopping)
773                         status = "ST";
774                 else if (obd->obd_set_up)
775                         status = "UP";
776                 else if (obd->obd_attached)
777                         status = "AT";
778                 else
779                         status = "--";
780                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
781                          i, status, obd->obd_type->typ_name,
782                          obd->obd_name, obd->obd_uuid.uuid,
783                          atomic_read(&obd->obd_refcount));
784         }
785         read_unlock(&obd_dev_lock);
786         return;
787 }
788
789 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
790    specified, then only the client with that uuid is returned,
791    otherwise any client connected to the tgt is returned. */
792 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
793                                           const char * typ_name,
794                                           struct obd_uuid *grp_uuid)
795 {
796         int i;
797
798         read_lock(&obd_dev_lock);
799         for (i = 0; i < class_devno_max(); i++) {
800                 struct obd_device *obd = class_num2obd(i);
801
802                 if (obd == NULL)
803                         continue;
804                 if ((strncmp(obd->obd_type->typ_name, typ_name,
805                              strlen(typ_name)) == 0)) {
806                         if (obd_uuid_equals(tgt_uuid,
807                                             &obd->u.cli.cl_target_uuid) &&
808                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
809                                                          &obd->obd_uuid) : 1)) {
810                                 read_unlock(&obd_dev_lock);
811                                 return obd;
812                         }
813                 }
814         }
815         read_unlock(&obd_dev_lock);
816
817         return NULL;
818 }
819 EXPORT_SYMBOL(class_find_client_obd);
820
821 /* Iterate the obd_device list looking devices have grp_uuid. Start
822    searching at *next, and if a device is found, the next index to look
823    at is saved in *next. If next is NULL, then the first matching device
824    will always be returned. */
825 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
826 {
827         int i;
828
829         if (next == NULL)
830                 i = 0;
831         else if (*next >= 0 && *next < class_devno_max())
832                 i = *next;
833         else
834                 return NULL;
835
836         read_lock(&obd_dev_lock);
837         for (; i < class_devno_max(); i++) {
838                 struct obd_device *obd = class_num2obd(i);
839
840                 if (obd == NULL)
841                         continue;
842                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
843                         if (next != NULL)
844                                 *next = i+1;
845                         read_unlock(&obd_dev_lock);
846                         return obd;
847                 }
848         }
849         read_unlock(&obd_dev_lock);
850
851         return NULL;
852 }
853 EXPORT_SYMBOL(class_devices_in_group);
854
855 /**
856  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
857  * adjust sptlrpc settings accordingly.
858  */
859 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
860 {
861         struct obd_device  *obd;
862         const char         *type;
863         int                 i, rc = 0, rc2;
864
865         LASSERT(namelen > 0);
866
867         read_lock(&obd_dev_lock);
868         for (i = 0; i < class_devno_max(); i++) {
869                 obd = class_num2obd(i);
870
871                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
872                         continue;
873
874                 /* only notify mdc, osc, osp, lwp, mdt, ost
875                  * because only these have a -sptlrpc llog */
876                 type = obd->obd_type->typ_name;
877                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
878                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
879                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
880                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
881                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
882                     strcmp(type, LUSTRE_OST_NAME) != 0)
883                         continue;
884
885                 if (strncmp(obd->obd_name, fsname, namelen))
886                         continue;
887
888                 class_incref(obd, __FUNCTION__, obd);
889                 read_unlock(&obd_dev_lock);
890                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
891                                          sizeof(KEY_SPTLRPC_CONF),
892                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
893                 rc = rc ? rc : rc2;
894                 class_decref(obd, __FUNCTION__, obd);
895                 read_lock(&obd_dev_lock);
896         }
897         read_unlock(&obd_dev_lock);
898         return rc;
899 }
900 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
901
902 void obd_cleanup_caches(void)
903 {
904         ENTRY;
905         if (obd_device_cachep) {
906                 kmem_cache_destroy(obd_device_cachep);
907                 obd_device_cachep = NULL;
908         }
909         if (obdo_cachep) {
910                 kmem_cache_destroy(obdo_cachep);
911                 obdo_cachep = NULL;
912         }
913         if (import_cachep) {
914                 kmem_cache_destroy(import_cachep);
915                 import_cachep = NULL;
916         }
917
918         EXIT;
919 }
920
921 int obd_init_caches(void)
922 {
923         int rc;
924         ENTRY;
925
926         LASSERT(obd_device_cachep == NULL);
927         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
928                                               sizeof(struct obd_device),
929                                               0, 0, NULL);
930         if (!obd_device_cachep)
931                 GOTO(out, rc = -ENOMEM);
932
933         LASSERT(obdo_cachep == NULL);
934         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
935                                         0, 0, NULL);
936         if (!obdo_cachep)
937                 GOTO(out, rc = -ENOMEM);
938
939         LASSERT(import_cachep == NULL);
940         import_cachep = kmem_cache_create("ll_import_cache",
941                                           sizeof(struct obd_import),
942                                           0, 0, NULL);
943         if (!import_cachep)
944                 GOTO(out, rc = -ENOMEM);
945
946         RETURN(0);
947 out:
948         obd_cleanup_caches();
949         RETURN(rc);
950 }
951
952 /* map connection to client */
953 struct obd_export *class_conn2export(struct lustre_handle *conn)
954 {
955         struct obd_export *export;
956         ENTRY;
957
958         if (!conn) {
959                 CDEBUG(D_CACHE, "looking for null handle\n");
960                 RETURN(NULL);
961         }
962
963         if (conn->cookie == -1) {  /* this means assign a new connection */
964                 CDEBUG(D_CACHE, "want a new connection\n");
965                 RETURN(NULL);
966         }
967
968         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
969         export = class_handle2object(conn->cookie, NULL);
970         RETURN(export);
971 }
972 EXPORT_SYMBOL(class_conn2export);
973
974 struct obd_device *class_exp2obd(struct obd_export *exp)
975 {
976         if (exp)
977                 return exp->exp_obd;
978         return NULL;
979 }
980 EXPORT_SYMBOL(class_exp2obd);
981
982 struct obd_device *class_conn2obd(struct lustre_handle *conn)
983 {
984         struct obd_export *export;
985         export = class_conn2export(conn);
986         if (export) {
987                 struct obd_device *obd = export->exp_obd;
988                 class_export_put(export);
989                 return obd;
990         }
991         return NULL;
992 }
993
994 struct obd_import *class_exp2cliimp(struct obd_export *exp)
995 {
996         struct obd_device *obd = exp->exp_obd;
997         if (obd == NULL)
998                 return NULL;
999         return obd->u.cli.cl_import;
1000 }
1001 EXPORT_SYMBOL(class_exp2cliimp);
1002
1003 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1004 {
1005         struct obd_device *obd = class_conn2obd(conn);
1006         if (obd == NULL)
1007                 return NULL;
1008         return obd->u.cli.cl_import;
1009 }
1010
1011 /* Export management functions */
1012 static void class_export_destroy(struct obd_export *exp)
1013 {
1014         struct obd_device *obd = exp->exp_obd;
1015         ENTRY;
1016
1017         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1018         LASSERT(obd != NULL);
1019
1020         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1021                exp->exp_client_uuid.uuid, obd->obd_name);
1022
1023         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1024         if (exp->exp_connection)
1025                 ptlrpc_put_connection_superhack(exp->exp_connection);
1026
1027         LASSERT(list_empty(&exp->exp_outstanding_replies));
1028         LASSERT(list_empty(&exp->exp_uncommitted_replies));
1029         LASSERT(list_empty(&exp->exp_req_replay_queue));
1030         LASSERT(list_empty(&exp->exp_hp_rpcs));
1031         obd_destroy_export(exp);
1032         /* self export doesn't hold a reference to an obd, although it
1033          * exists until freeing of the obd */
1034         if (exp != obd->obd_self_export)
1035                 class_decref(obd, "export", exp);
1036
1037         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1038         EXIT;
1039 }
1040
1041 static void export_handle_addref(void *export)
1042 {
1043         class_export_get(export);
1044 }
1045
1046 static struct portals_handle_ops export_handle_ops = {
1047         .hop_addref = export_handle_addref,
1048         .hop_free   = NULL,
1049 };
1050
1051 struct obd_export *class_export_get(struct obd_export *exp)
1052 {
1053         atomic_inc(&exp->exp_refcount);
1054         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1055                atomic_read(&exp->exp_refcount));
1056         return exp;
1057 }
1058 EXPORT_SYMBOL(class_export_get);
1059
1060 void class_export_put(struct obd_export *exp)
1061 {
1062         LASSERT(exp != NULL);
1063         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1064         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1065                atomic_read(&exp->exp_refcount) - 1);
1066
1067         if (atomic_dec_and_test(&exp->exp_refcount)) {
1068                 struct obd_device *obd = exp->exp_obd;
1069
1070                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1071                        exp, exp->exp_client_uuid.uuid);
1072
1073                 /* release nid stat refererence */
1074                 lprocfs_exp_cleanup(exp);
1075
1076                 if (exp == obd->obd_self_export) {
1077                         /* self export should be destroyed without
1078                          * zombie thread as it doesn't hold a
1079                          * reference to obd and doesn't hold any
1080                          * resources */
1081                         class_export_destroy(exp);
1082                         /* self export is destroyed, no class
1083                          * references exist and it is safe to free
1084                          * obd */
1085                         class_free_dev(obd);
1086                 } else {
1087                         LASSERT(!list_empty(&exp->exp_obd_chain));
1088                         obd_zombie_export_add(exp);
1089                 }
1090
1091         }
1092 }
1093 EXPORT_SYMBOL(class_export_put);
1094 /* Creates a new export, adds it to the hash table, and returns a
1095  * pointer to it. The refcount is 2: one for the hash reference, and
1096  * one for the pointer returned by this function. */
1097 struct obd_export *__class_new_export(struct obd_device *obd,
1098                                       struct obd_uuid *cluuid, bool is_self)
1099 {
1100         struct obd_export *export;
1101         struct cfs_hash *hash = NULL;
1102         int rc = 0;
1103         ENTRY;
1104
1105         OBD_ALLOC_PTR(export);
1106         if (!export)
1107                 return ERR_PTR(-ENOMEM);
1108
1109         export->exp_conn_cnt = 0;
1110         export->exp_lock_hash = NULL;
1111         export->exp_flock_hash = NULL;
1112         /* 2 = class_handle_hash + last */
1113         atomic_set(&export->exp_refcount, 2);
1114         atomic_set(&export->exp_rpc_count, 0);
1115         atomic_set(&export->exp_cb_count, 0);
1116         atomic_set(&export->exp_locks_count, 0);
1117 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1118         INIT_LIST_HEAD(&export->exp_locks_list);
1119         spin_lock_init(&export->exp_locks_list_guard);
1120 #endif
1121         atomic_set(&export->exp_replay_count, 0);
1122         export->exp_obd = obd;
1123         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1124         spin_lock_init(&export->exp_uncommitted_replies_lock);
1125         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1126         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1127         INIT_LIST_HEAD(&export->exp_handle.h_link);
1128         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1129         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1130         class_handle_hash(&export->exp_handle, &export_handle_ops);
1131         export->exp_last_request_time = ktime_get_real_seconds();
1132         spin_lock_init(&export->exp_lock);
1133         spin_lock_init(&export->exp_rpc_lock);
1134         INIT_HLIST_NODE(&export->exp_uuid_hash);
1135         INIT_HLIST_NODE(&export->exp_nid_hash);
1136         INIT_HLIST_NODE(&export->exp_gen_hash);
1137         spin_lock_init(&export->exp_bl_list_lock);
1138         INIT_LIST_HEAD(&export->exp_bl_list);
1139         INIT_LIST_HEAD(&export->exp_stale_list);
1140
1141         export->exp_sp_peer = LUSTRE_SP_ANY;
1142         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1143         export->exp_client_uuid = *cluuid;
1144         obd_init_export(export);
1145
1146         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1147                 spin_lock(&obd->obd_dev_lock);
1148                 /* shouldn't happen, but might race */
1149                 if (obd->obd_stopping)
1150                         GOTO(exit_unlock, rc = -ENODEV);
1151
1152                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1153                 if (hash == NULL)
1154                         GOTO(exit_unlock, rc = -ENODEV);
1155                 spin_unlock(&obd->obd_dev_lock);
1156
1157                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1158                 if (rc != 0) {
1159                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1160                                       obd->obd_name, cluuid->uuid, rc);
1161                         GOTO(exit_err, rc = -EALREADY);
1162                 }
1163         }
1164
1165         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1166         spin_lock(&obd->obd_dev_lock);
1167         if (obd->obd_stopping) {
1168                 if (hash)
1169                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1170                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1171         }
1172
1173         if (!is_self) {
1174                 class_incref(obd, "export", export);
1175                 list_add_tail(&export->exp_obd_chain_timed,
1176                               &obd->obd_exports_timed);
1177                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1178                 obd->obd_num_exports++;
1179         } else {
1180                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1181                 INIT_LIST_HEAD(&export->exp_obd_chain);
1182         }
1183         spin_unlock(&obd->obd_dev_lock);
1184         if (hash)
1185                 cfs_hash_putref(hash);
1186         RETURN(export);
1187
1188 exit_unlock:
1189         spin_unlock(&obd->obd_dev_lock);
1190 exit_err:
1191         if (hash)
1192                 cfs_hash_putref(hash);
1193         class_handle_unhash(&export->exp_handle);
1194         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1195         obd_destroy_export(export);
1196         OBD_FREE_PTR(export);
1197         return ERR_PTR(rc);
1198 }
1199
1200 struct obd_export *class_new_export(struct obd_device *obd,
1201                                     struct obd_uuid *uuid)
1202 {
1203         return __class_new_export(obd, uuid, false);
1204 }
1205 EXPORT_SYMBOL(class_new_export);
1206
1207 struct obd_export *class_new_export_self(struct obd_device *obd,
1208                                          struct obd_uuid *uuid)
1209 {
1210         return __class_new_export(obd, uuid, true);
1211 }
1212
1213 void class_unlink_export(struct obd_export *exp)
1214 {
1215         class_handle_unhash(&exp->exp_handle);
1216
1217         if (exp->exp_obd->obd_self_export == exp) {
1218                 class_export_put(exp);
1219                 return;
1220         }
1221
1222         spin_lock(&exp->exp_obd->obd_dev_lock);
1223         /* delete an uuid-export hashitem from hashtables */
1224         if (!hlist_unhashed(&exp->exp_uuid_hash))
1225                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1226                              &exp->exp_client_uuid,
1227                              &exp->exp_uuid_hash);
1228
1229 #ifdef HAVE_SERVER_SUPPORT
1230         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1231                 struct tg_export_data   *ted = &exp->exp_target_data;
1232                 struct cfs_hash         *hash;
1233
1234                 /* Because obd_gen_hash will not be released until
1235                  * class_cleanup(), so hash should never be NULL here */
1236                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1237                 LASSERT(hash != NULL);
1238                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1239                              &exp->exp_gen_hash);
1240                 cfs_hash_putref(hash);
1241         }
1242 #endif /* HAVE_SERVER_SUPPORT */
1243
1244         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1245         list_del_init(&exp->exp_obd_chain_timed);
1246         exp->exp_obd->obd_num_exports--;
1247         spin_unlock(&exp->exp_obd->obd_dev_lock);
1248         atomic_inc(&obd_stale_export_num);
1249
1250         /* A reference is kept by obd_stale_exports list */
1251         obd_stale_export_put(exp);
1252 }
1253 EXPORT_SYMBOL(class_unlink_export);
1254
1255 /* Import management functions */
1256 static void class_import_destroy(struct obd_import *imp)
1257 {
1258         ENTRY;
1259
1260         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1261                 imp->imp_obd->obd_name);
1262
1263         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1264
1265         ptlrpc_put_connection_superhack(imp->imp_connection);
1266
1267         while (!list_empty(&imp->imp_conn_list)) {
1268                 struct obd_import_conn *imp_conn;
1269
1270                 imp_conn = list_entry(imp->imp_conn_list.next,
1271                                       struct obd_import_conn, oic_item);
1272                 list_del_init(&imp_conn->oic_item);
1273                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1274                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1275         }
1276
1277         LASSERT(imp->imp_sec == NULL);
1278         class_decref(imp->imp_obd, "import", imp);
1279         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1280         EXIT;
1281 }
1282
1283 static void import_handle_addref(void *import)
1284 {
1285         class_import_get(import);
1286 }
1287
1288 static struct portals_handle_ops import_handle_ops = {
1289         .hop_addref = import_handle_addref,
1290         .hop_free   = NULL,
1291 };
1292
1293 struct obd_import *class_import_get(struct obd_import *import)
1294 {
1295         atomic_inc(&import->imp_refcount);
1296         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1297                atomic_read(&import->imp_refcount),
1298                import->imp_obd->obd_name);
1299         return import;
1300 }
1301 EXPORT_SYMBOL(class_import_get);
1302
1303 void class_import_put(struct obd_import *imp)
1304 {
1305         ENTRY;
1306
1307         LASSERT(list_empty(&imp->imp_zombie_chain));
1308         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1309
1310         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1311                atomic_read(&imp->imp_refcount) - 1,
1312                imp->imp_obd->obd_name);
1313
1314         if (atomic_dec_and_test(&imp->imp_refcount)) {
1315                 CDEBUG(D_INFO, "final put import %p\n", imp);
1316                 obd_zombie_import_add(imp);
1317         }
1318
1319         /* catch possible import put race */
1320         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1321         EXIT;
1322 }
1323 EXPORT_SYMBOL(class_import_put);
1324
1325 static void init_imp_at(struct imp_at *at) {
1326         int i;
1327         at_init(&at->iat_net_latency, 0, 0);
1328         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1329                 /* max service estimates are tracked on the server side, so
1330                    don't use the AT history here, just use the last reported
1331                    val. (But keep hist for proc histogram, worst_ever) */
1332                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1333                         AT_FLG_NOHIST);
1334         }
1335 }
1336
1337 struct obd_import *class_new_import(struct obd_device *obd)
1338 {
1339         struct obd_import *imp;
1340         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1341
1342         OBD_ALLOC(imp, sizeof(*imp));
1343         if (imp == NULL)
1344                 return NULL;
1345
1346         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1347         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1348         INIT_LIST_HEAD(&imp->imp_replay_list);
1349         INIT_LIST_HEAD(&imp->imp_sending_list);
1350         INIT_LIST_HEAD(&imp->imp_delayed_list);
1351         INIT_LIST_HEAD(&imp->imp_committed_list);
1352         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1353         imp->imp_known_replied_xid = 0;
1354         imp->imp_replay_cursor = &imp->imp_committed_list;
1355         spin_lock_init(&imp->imp_lock);
1356         imp->imp_last_success_conn = 0;
1357         imp->imp_state = LUSTRE_IMP_NEW;
1358         imp->imp_obd = class_incref(obd, "import", imp);
1359         mutex_init(&imp->imp_sec_mutex);
1360         init_waitqueue_head(&imp->imp_recovery_waitq);
1361
1362         if (curr_pid_ns->child_reaper)
1363                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1364         else
1365                 imp->imp_sec_refpid = 1;
1366
1367         atomic_set(&imp->imp_refcount, 2);
1368         atomic_set(&imp->imp_unregistering, 0);
1369         atomic_set(&imp->imp_inflight, 0);
1370         atomic_set(&imp->imp_replay_inflight, 0);
1371         atomic_set(&imp->imp_inval_count, 0);
1372         INIT_LIST_HEAD(&imp->imp_conn_list);
1373         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1374         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1375         init_imp_at(&imp->imp_at);
1376
1377         /* the default magic is V2, will be used in connect RPC, and
1378          * then adjusted according to the flags in request/reply. */
1379         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1380
1381         return imp;
1382 }
1383 EXPORT_SYMBOL(class_new_import);
1384
1385 void class_destroy_import(struct obd_import *import)
1386 {
1387         LASSERT(import != NULL);
1388         LASSERT(import != LP_POISON);
1389
1390         class_handle_unhash(&import->imp_handle);
1391
1392         spin_lock(&import->imp_lock);
1393         import->imp_generation++;
1394         spin_unlock(&import->imp_lock);
1395         class_import_put(import);
1396 }
1397 EXPORT_SYMBOL(class_destroy_import);
1398
1399 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1400
1401 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1402 {
1403         spin_lock(&exp->exp_locks_list_guard);
1404
1405         LASSERT(lock->l_exp_refs_nr >= 0);
1406
1407         if (lock->l_exp_refs_target != NULL &&
1408             lock->l_exp_refs_target != exp) {
1409                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1410                               exp, lock, lock->l_exp_refs_target);
1411         }
1412         if ((lock->l_exp_refs_nr ++) == 0) {
1413                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1414                 lock->l_exp_refs_target = exp;
1415         }
1416         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1417                lock, exp, lock->l_exp_refs_nr);
1418         spin_unlock(&exp->exp_locks_list_guard);
1419 }
1420 EXPORT_SYMBOL(__class_export_add_lock_ref);
1421
1422 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1423 {
1424         spin_lock(&exp->exp_locks_list_guard);
1425         LASSERT(lock->l_exp_refs_nr > 0);
1426         if (lock->l_exp_refs_target != exp) {
1427                 LCONSOLE_WARN("lock %p, "
1428                               "mismatching export pointers: %p, %p\n",
1429                               lock, lock->l_exp_refs_target, exp);
1430         }
1431         if (-- lock->l_exp_refs_nr == 0) {
1432                 list_del_init(&lock->l_exp_refs_link);
1433                 lock->l_exp_refs_target = NULL;
1434         }
1435         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1436                lock, exp, lock->l_exp_refs_nr);
1437         spin_unlock(&exp->exp_locks_list_guard);
1438 }
1439 EXPORT_SYMBOL(__class_export_del_lock_ref);
1440 #endif
1441
1442 /* A connection defines an export context in which preallocation can
1443    be managed. This releases the export pointer reference, and returns
1444    the export handle, so the export refcount is 1 when this function
1445    returns. */
1446 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1447                   struct obd_uuid *cluuid)
1448 {
1449         struct obd_export *export;
1450         LASSERT(conn != NULL);
1451         LASSERT(obd != NULL);
1452         LASSERT(cluuid != NULL);
1453         ENTRY;
1454
1455         export = class_new_export(obd, cluuid);
1456         if (IS_ERR(export))
1457                 RETURN(PTR_ERR(export));
1458
1459         conn->cookie = export->exp_handle.h_cookie;
1460         class_export_put(export);
1461
1462         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1463                cluuid->uuid, conn->cookie);
1464         RETURN(0);
1465 }
1466 EXPORT_SYMBOL(class_connect);
1467
1468 /* if export is involved in recovery then clean up related things */
1469 static void class_export_recovery_cleanup(struct obd_export *exp)
1470 {
1471         struct obd_device *obd = exp->exp_obd;
1472
1473         spin_lock(&obd->obd_recovery_task_lock);
1474         if (obd->obd_recovering) {
1475                 if (exp->exp_in_recovery) {
1476                         spin_lock(&exp->exp_lock);
1477                         exp->exp_in_recovery = 0;
1478                         spin_unlock(&exp->exp_lock);
1479                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1480                         atomic_dec(&obd->obd_connected_clients);
1481                 }
1482
1483                 /* if called during recovery then should update
1484                  * obd_stale_clients counter,
1485                  * lightweight exports are not counted */
1486                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1487                         exp->exp_obd->obd_stale_clients++;
1488         }
1489         spin_unlock(&obd->obd_recovery_task_lock);
1490
1491         spin_lock(&exp->exp_lock);
1492         /** Cleanup req replay fields */
1493         if (exp->exp_req_replay_needed) {
1494                 exp->exp_req_replay_needed = 0;
1495
1496                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1497                 atomic_dec(&obd->obd_req_replay_clients);
1498         }
1499
1500         /** Cleanup lock replay data */
1501         if (exp->exp_lock_replay_needed) {
1502                 exp->exp_lock_replay_needed = 0;
1503
1504                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1505                 atomic_dec(&obd->obd_lock_replay_clients);
1506         }
1507         spin_unlock(&exp->exp_lock);
1508 }
1509
1510 /* This function removes 1-3 references from the export:
1511  * 1 - for export pointer passed
1512  * and if disconnect really need
1513  * 2 - removing from hash
1514  * 3 - in client_unlink_export
1515  * The export pointer passed to this function can destroyed */
1516 int class_disconnect(struct obd_export *export)
1517 {
1518         int already_disconnected;
1519         ENTRY;
1520
1521         if (export == NULL) {
1522                 CWARN("attempting to free NULL export %p\n", export);
1523                 RETURN(-EINVAL);
1524         }
1525
1526         spin_lock(&export->exp_lock);
1527         already_disconnected = export->exp_disconnected;
1528         export->exp_disconnected = 1;
1529         /*  We hold references of export for uuid hash
1530          *  and nid_hash and export link at least. So
1531          *  it is safe to call cfs_hash_del in there.  */
1532         if (!hlist_unhashed(&export->exp_nid_hash))
1533                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1534                              &export->exp_connection->c_peer.nid,
1535                              &export->exp_nid_hash);
1536         spin_unlock(&export->exp_lock);
1537
1538         /* class_cleanup(), abort_recovery(), and class_fail_export()
1539          * all end up in here, and if any of them race we shouldn't
1540          * call extra class_export_puts(). */
1541         if (already_disconnected) {
1542                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1543                 GOTO(no_disconn, already_disconnected);
1544         }
1545
1546         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1547                export->exp_handle.h_cookie);
1548
1549         class_export_recovery_cleanup(export);
1550         class_unlink_export(export);
1551 no_disconn:
1552         class_export_put(export);
1553         RETURN(0);
1554 }
1555 EXPORT_SYMBOL(class_disconnect);
1556
1557 /* Return non-zero for a fully connected export */
1558 int class_connected_export(struct obd_export *exp)
1559 {
1560         int connected = 0;
1561
1562         if (exp) {
1563                 spin_lock(&exp->exp_lock);
1564                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1565                 spin_unlock(&exp->exp_lock);
1566         }
1567         return connected;
1568 }
1569 EXPORT_SYMBOL(class_connected_export);
1570
1571 static void class_disconnect_export_list(struct list_head *list,
1572                                          enum obd_option flags)
1573 {
1574         int rc;
1575         struct obd_export *exp;
1576         ENTRY;
1577
1578         /* It's possible that an export may disconnect itself, but
1579          * nothing else will be added to this list. */
1580         while (!list_empty(list)) {
1581                 exp = list_entry(list->next, struct obd_export,
1582                                  exp_obd_chain);
1583                 /* need for safe call CDEBUG after obd_disconnect */
1584                 class_export_get(exp);
1585
1586                 spin_lock(&exp->exp_lock);
1587                 exp->exp_flags = flags;
1588                 spin_unlock(&exp->exp_lock);
1589
1590                 if (obd_uuid_equals(&exp->exp_client_uuid,
1591                                     &exp->exp_obd->obd_uuid)) {
1592                         CDEBUG(D_HA,
1593                                "exp %p export uuid == obd uuid, don't discon\n",
1594                                exp);
1595                         /* Need to delete this now so we don't end up pointing
1596                          * to work_list later when this export is cleaned up. */
1597                         list_del_init(&exp->exp_obd_chain);
1598                         class_export_put(exp);
1599                         continue;
1600                 }
1601
1602                 class_export_get(exp);
1603                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1604                        "last request at %lld\n",
1605                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1606                        exp, exp->exp_last_request_time);
1607                 /* release one export reference anyway */
1608                 rc = obd_disconnect(exp);
1609
1610                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1611                        obd_export_nid2str(exp), exp, rc);
1612                 class_export_put(exp);
1613         }
1614         EXIT;
1615 }
1616
1617 void class_disconnect_exports(struct obd_device *obd)
1618 {
1619         struct list_head work_list;
1620         ENTRY;
1621
1622         /* Move all of the exports from obd_exports to a work list, en masse. */
1623         INIT_LIST_HEAD(&work_list);
1624         spin_lock(&obd->obd_dev_lock);
1625         list_splice_init(&obd->obd_exports, &work_list);
1626         list_splice_init(&obd->obd_delayed_exports, &work_list);
1627         spin_unlock(&obd->obd_dev_lock);
1628
1629         if (!list_empty(&work_list)) {
1630                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1631                        "disconnecting them\n", obd->obd_minor, obd);
1632                 class_disconnect_export_list(&work_list,
1633                                              exp_flags_from_obd(obd));
1634         } else
1635                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1636                        obd->obd_minor, obd);
1637         EXIT;
1638 }
1639 EXPORT_SYMBOL(class_disconnect_exports);
1640
1641 /* Remove exports that have not completed recovery.
1642  */
1643 void class_disconnect_stale_exports(struct obd_device *obd,
1644                                     int (*test_export)(struct obd_export *))
1645 {
1646         struct list_head work_list;
1647         struct obd_export *exp, *n;
1648         int evicted = 0;
1649         ENTRY;
1650
1651         INIT_LIST_HEAD(&work_list);
1652         spin_lock(&obd->obd_dev_lock);
1653         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1654                                  exp_obd_chain) {
1655                 /* don't count self-export as client */
1656                 if (obd_uuid_equals(&exp->exp_client_uuid,
1657                                     &exp->exp_obd->obd_uuid))
1658                         continue;
1659
1660                 /* don't evict clients which have no slot in last_rcvd
1661                  * (e.g. lightweight connection) */
1662                 if (exp->exp_target_data.ted_lr_idx == -1)
1663                         continue;
1664
1665                 spin_lock(&exp->exp_lock);
1666                 if (exp->exp_failed || test_export(exp)) {
1667                         spin_unlock(&exp->exp_lock);
1668                         continue;
1669                 }
1670                 exp->exp_failed = 1;
1671                 spin_unlock(&exp->exp_lock);
1672
1673                 list_move(&exp->exp_obd_chain, &work_list);
1674                 evicted++;
1675                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1676                        obd->obd_name, exp->exp_client_uuid.uuid,
1677                        exp->exp_connection == NULL ? "<unknown>" :
1678                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1679                 print_export_data(exp, "EVICTING", 0, D_HA);
1680         }
1681         spin_unlock(&obd->obd_dev_lock);
1682
1683         if (evicted)
1684                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1685                               obd->obd_name, evicted);
1686
1687         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1688                                                  OBD_OPT_ABORT_RECOV);
1689         EXIT;
1690 }
1691 EXPORT_SYMBOL(class_disconnect_stale_exports);
1692
1693 void class_fail_export(struct obd_export *exp)
1694 {
1695         int rc, already_failed;
1696
1697         spin_lock(&exp->exp_lock);
1698         already_failed = exp->exp_failed;
1699         exp->exp_failed = 1;
1700         spin_unlock(&exp->exp_lock);
1701
1702         if (already_failed) {
1703                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1704                        exp, exp->exp_client_uuid.uuid);
1705                 return;
1706         }
1707
1708         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1709                exp, exp->exp_client_uuid.uuid);
1710
1711         if (obd_dump_on_timeout)
1712                 libcfs_debug_dumplog();
1713
1714         /* need for safe call CDEBUG after obd_disconnect */
1715         class_export_get(exp);
1716
1717         /* Most callers into obd_disconnect are removing their own reference
1718          * (request, for example) in addition to the one from the hash table.
1719          * We don't have such a reference here, so make one. */
1720         class_export_get(exp);
1721         rc = obd_disconnect(exp);
1722         if (rc)
1723                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1724         else
1725                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1726                        exp, exp->exp_client_uuid.uuid);
1727         class_export_put(exp);
1728 }
1729 EXPORT_SYMBOL(class_fail_export);
1730
1731 char *obd_export_nid2str(struct obd_export *exp)
1732 {
1733         if (exp->exp_connection != NULL)
1734                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1735
1736         return "(no nid)";
1737 }
1738 EXPORT_SYMBOL(obd_export_nid2str);
1739
1740 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1741 {
1742         struct cfs_hash *nid_hash;
1743         struct obd_export *doomed_exp = NULL;
1744         int exports_evicted = 0;
1745
1746         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1747
1748         spin_lock(&obd->obd_dev_lock);
1749         /* umount has run already, so evict thread should leave
1750          * its task to umount thread now */
1751         if (obd->obd_stopping) {
1752                 spin_unlock(&obd->obd_dev_lock);
1753                 return exports_evicted;
1754         }
1755         nid_hash = obd->obd_nid_hash;
1756         cfs_hash_getref(nid_hash);
1757         spin_unlock(&obd->obd_dev_lock);
1758
1759         do {
1760                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1761                 if (doomed_exp == NULL)
1762                         break;
1763
1764                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1765                          "nid %s found, wanted nid %s, requested nid %s\n",
1766                          obd_export_nid2str(doomed_exp),
1767                          libcfs_nid2str(nid_key), nid);
1768                 LASSERTF(doomed_exp != obd->obd_self_export,
1769                          "self-export is hashed by NID?\n");
1770                 exports_evicted++;
1771                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1772                               "request\n", obd->obd_name,
1773                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1774                               obd_export_nid2str(doomed_exp));
1775                 class_fail_export(doomed_exp);
1776                 class_export_put(doomed_exp);
1777         } while (1);
1778
1779         cfs_hash_putref(nid_hash);
1780
1781         if (!exports_evicted)
1782                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1783                        obd->obd_name, nid);
1784         return exports_evicted;
1785 }
1786 EXPORT_SYMBOL(obd_export_evict_by_nid);
1787
1788 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1789 {
1790         struct cfs_hash *uuid_hash;
1791         struct obd_export *doomed_exp = NULL;
1792         struct obd_uuid doomed_uuid;
1793         int exports_evicted = 0;
1794
1795         spin_lock(&obd->obd_dev_lock);
1796         if (obd->obd_stopping) {
1797                 spin_unlock(&obd->obd_dev_lock);
1798                 return exports_evicted;
1799         }
1800         uuid_hash = obd->obd_uuid_hash;
1801         cfs_hash_getref(uuid_hash);
1802         spin_unlock(&obd->obd_dev_lock);
1803
1804         obd_str2uuid(&doomed_uuid, uuid);
1805         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1806                 CERROR("%s: can't evict myself\n", obd->obd_name);
1807                 cfs_hash_putref(uuid_hash);
1808                 return exports_evicted;
1809         }
1810
1811         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1812
1813         if (doomed_exp == NULL) {
1814                 CERROR("%s: can't disconnect %s: no exports found\n",
1815                        obd->obd_name, uuid);
1816         } else {
1817                 CWARN("%s: evicting %s at adminstrative request\n",
1818                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1819                 class_fail_export(doomed_exp);
1820                 class_export_put(doomed_exp);
1821                 exports_evicted++;
1822         }
1823         cfs_hash_putref(uuid_hash);
1824
1825         return exports_evicted;
1826 }
1827
1828 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1829 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1830 EXPORT_SYMBOL(class_export_dump_hook);
1831 #endif
1832
1833 static void print_export_data(struct obd_export *exp, const char *status,
1834                               int locks, int debug_level)
1835 {
1836         struct ptlrpc_reply_state *rs;
1837         struct ptlrpc_reply_state *first_reply = NULL;
1838         int nreplies = 0;
1839
1840         spin_lock(&exp->exp_lock);
1841         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1842                             rs_exp_list) {
1843                 if (nreplies == 0)
1844                         first_reply = rs;
1845                 nreplies++;
1846         }
1847         spin_unlock(&exp->exp_lock);
1848
1849         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1850                "%p %s %llu stale:%d\n",
1851                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1852                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1853                atomic_read(&exp->exp_rpc_count),
1854                atomic_read(&exp->exp_cb_count),
1855                atomic_read(&exp->exp_locks_count),
1856                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1857                nreplies, first_reply, nreplies > 3 ? "..." : "",
1858                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1859 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1860         if (locks && class_export_dump_hook != NULL)
1861                 class_export_dump_hook(exp);
1862 #endif
1863 }
1864
1865 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1866 {
1867         struct obd_export *exp;
1868
1869         spin_lock(&obd->obd_dev_lock);
1870         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1871                 print_export_data(exp, "ACTIVE", locks, debug_level);
1872         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1873                 print_export_data(exp, "UNLINKED", locks, debug_level);
1874         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1875                 print_export_data(exp, "DELAYED", locks, debug_level);
1876         spin_unlock(&obd->obd_dev_lock);
1877         spin_lock(&obd_zombie_impexp_lock);
1878         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1879                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1880         spin_unlock(&obd_zombie_impexp_lock);
1881 }
1882
1883 void obd_exports_barrier(struct obd_device *obd)
1884 {
1885         int waited = 2;
1886         LASSERT(list_empty(&obd->obd_exports));
1887         spin_lock(&obd->obd_dev_lock);
1888         while (!list_empty(&obd->obd_unlinked_exports)) {
1889                 spin_unlock(&obd->obd_dev_lock);
1890                 set_current_state(TASK_UNINTERRUPTIBLE);
1891                 schedule_timeout(cfs_time_seconds(waited));
1892                 if (waited > 5 && is_power_of_2(waited)) {
1893                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1894                                       "more than %d seconds. "
1895                                       "The obd refcount = %d. Is it stuck?\n",
1896                                       obd->obd_name, waited,
1897                                       atomic_read(&obd->obd_refcount));
1898                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1899                 }
1900                 waited *= 2;
1901                 spin_lock(&obd->obd_dev_lock);
1902         }
1903         spin_unlock(&obd->obd_dev_lock);
1904 }
1905 EXPORT_SYMBOL(obd_exports_barrier);
1906
1907 /* Total amount of zombies to be destroyed */
1908 static int zombies_count = 0;
1909
1910 /**
1911  * kill zombie imports and exports
1912  */
1913 void obd_zombie_impexp_cull(void)
1914 {
1915         struct obd_import *import;
1916         struct obd_export *export;
1917         ENTRY;
1918
1919         do {
1920                 spin_lock(&obd_zombie_impexp_lock);
1921
1922                 import = NULL;
1923                 if (!list_empty(&obd_zombie_imports)) {
1924                         import = list_entry(obd_zombie_imports.next,
1925                                             struct obd_import,
1926                                             imp_zombie_chain);
1927                         list_del_init(&import->imp_zombie_chain);
1928                 }
1929
1930                 export = NULL;
1931                 if (!list_empty(&obd_zombie_exports)) {
1932                         export = list_entry(obd_zombie_exports.next,
1933                                             struct obd_export,
1934                                             exp_obd_chain);
1935                         list_del_init(&export->exp_obd_chain);
1936                 }
1937
1938                 spin_unlock(&obd_zombie_impexp_lock);
1939
1940                 if (import != NULL) {
1941                         class_import_destroy(import);
1942                         spin_lock(&obd_zombie_impexp_lock);
1943                         zombies_count--;
1944                         spin_unlock(&obd_zombie_impexp_lock);
1945                 }
1946
1947                 if (export != NULL) {
1948                         class_export_destroy(export);
1949                         spin_lock(&obd_zombie_impexp_lock);
1950                         zombies_count--;
1951                         spin_unlock(&obd_zombie_impexp_lock);
1952                 }
1953
1954                 cond_resched();
1955         } while (import != NULL || export != NULL);
1956         EXIT;
1957 }
1958
1959 static DECLARE_COMPLETION(obd_zombie_start);
1960 static DECLARE_COMPLETION(obd_zombie_stop);
1961 static unsigned long obd_zombie_flags;
1962 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1963 static pid_t obd_zombie_pid;
1964
1965 enum {
1966         OBD_ZOMBIE_STOP         = 0x0001,
1967 };
1968
1969 /**
1970  * check for work for kill zombie import/export thread.
1971  */
1972 static int obd_zombie_impexp_check(void *arg)
1973 {
1974         int rc;
1975
1976         spin_lock(&obd_zombie_impexp_lock);
1977         rc = (zombies_count == 0) &&
1978              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1979         spin_unlock(&obd_zombie_impexp_lock);
1980
1981         RETURN(rc);
1982 }
1983
1984 /**
1985  * Add export to the obd_zombe thread and notify it.
1986  */
1987 static void obd_zombie_export_add(struct obd_export *exp) {
1988         atomic_dec(&obd_stale_export_num);
1989         spin_lock(&exp->exp_obd->obd_dev_lock);
1990         LASSERT(!list_empty(&exp->exp_obd_chain));
1991         list_del_init(&exp->exp_obd_chain);
1992         spin_unlock(&exp->exp_obd->obd_dev_lock);
1993         spin_lock(&obd_zombie_impexp_lock);
1994         zombies_count++;
1995         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1996         spin_unlock(&obd_zombie_impexp_lock);
1997
1998         obd_zombie_impexp_notify();
1999 }
2000
2001 /**
2002  * Add import to the obd_zombe thread and notify it.
2003  */
2004 static void obd_zombie_import_add(struct obd_import *imp) {
2005         LASSERT(imp->imp_sec == NULL);
2006         spin_lock(&obd_zombie_impexp_lock);
2007         LASSERT(list_empty(&imp->imp_zombie_chain));
2008         zombies_count++;
2009         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
2010         spin_unlock(&obd_zombie_impexp_lock);
2011
2012         obd_zombie_impexp_notify();
2013 }
2014
2015 /**
2016  * notify import/export destroy thread about new zombie.
2017  */
2018 static void obd_zombie_impexp_notify(void)
2019 {
2020         /*
2021          * Make sure obd_zomebie_impexp_thread get this notification.
2022          * It is possible this signal only get by obd_zombie_barrier, and
2023          * barrier gulps this notification and sleeps away and hangs ensues
2024          */
2025         wake_up_all(&obd_zombie_waitq);
2026 }
2027
2028 /**
2029  * check whether obd_zombie is idle
2030  */
2031 static int obd_zombie_is_idle(void)
2032 {
2033         int rc;
2034
2035         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
2036         spin_lock(&obd_zombie_impexp_lock);
2037         rc = (zombies_count == 0);
2038         spin_unlock(&obd_zombie_impexp_lock);
2039         return rc;
2040 }
2041
2042 /**
2043  * wait when obd_zombie import/export queues become empty
2044  */
2045 void obd_zombie_barrier(void)
2046 {
2047         struct l_wait_info lwi = { 0 };
2048
2049         if (obd_zombie_pid == current_pid())
2050                 /* don't wait for myself */
2051                 return;
2052         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
2053 }
2054 EXPORT_SYMBOL(obd_zombie_barrier);
2055
2056
2057 struct obd_export *obd_stale_export_get(void)
2058 {
2059         struct obd_export *exp = NULL;
2060         ENTRY;
2061
2062         spin_lock(&obd_stale_export_lock);
2063         if (!list_empty(&obd_stale_exports)) {
2064                 exp = list_entry(obd_stale_exports.next,
2065                                  struct obd_export, exp_stale_list);
2066                 list_del_init(&exp->exp_stale_list);
2067         }
2068         spin_unlock(&obd_stale_export_lock);
2069
2070         if (exp) {
2071                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2072                        atomic_read(&obd_stale_export_num));
2073         }
2074         RETURN(exp);
2075 }
2076 EXPORT_SYMBOL(obd_stale_export_get);
2077
2078 void obd_stale_export_put(struct obd_export *exp)
2079 {
2080         ENTRY;
2081
2082         LASSERT(list_empty(&exp->exp_stale_list));
2083         if (exp->exp_lock_hash &&
2084             atomic_read(&exp->exp_lock_hash->hs_count)) {
2085                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2086                        atomic_read(&obd_stale_export_num));
2087
2088                 spin_lock_bh(&exp->exp_bl_list_lock);
2089                 spin_lock(&obd_stale_export_lock);
2090                 /* Add to the tail if there is no blocked locks,
2091                  * to the head otherwise. */
2092                 if (list_empty(&exp->exp_bl_list))
2093                         list_add_tail(&exp->exp_stale_list,
2094                                       &obd_stale_exports);
2095                 else
2096                         list_add(&exp->exp_stale_list,
2097                                  &obd_stale_exports);
2098
2099                 spin_unlock(&obd_stale_export_lock);
2100                 spin_unlock_bh(&exp->exp_bl_list_lock);
2101         } else {
2102                 class_export_put(exp);
2103         }
2104         EXIT;
2105 }
2106 EXPORT_SYMBOL(obd_stale_export_put);
2107
2108 /**
2109  * Adjust the position of the export in the stale list,
2110  * i.e. move to the head of the list if is needed.
2111  **/
2112 void obd_stale_export_adjust(struct obd_export *exp)
2113 {
2114         LASSERT(exp != NULL);
2115         spin_lock_bh(&exp->exp_bl_list_lock);
2116         spin_lock(&obd_stale_export_lock);
2117
2118         if (!list_empty(&exp->exp_stale_list) &&
2119             !list_empty(&exp->exp_bl_list))
2120                 list_move(&exp->exp_stale_list, &obd_stale_exports);
2121
2122         spin_unlock(&obd_stale_export_lock);
2123         spin_unlock_bh(&exp->exp_bl_list_lock);
2124 }
2125 EXPORT_SYMBOL(obd_stale_export_adjust);
2126
2127 /**
2128  * destroy zombie export/import thread.
2129  */
2130 static int obd_zombie_impexp_thread(void *unused)
2131 {
2132         unshare_fs_struct();
2133         complete(&obd_zombie_start);
2134
2135         obd_zombie_pid = current_pid();
2136
2137         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2138                 struct l_wait_info lwi = { 0 };
2139
2140                 l_wait_event(obd_zombie_waitq,
2141                              !obd_zombie_impexp_check(NULL), &lwi);
2142                 obd_zombie_impexp_cull();
2143
2144                 /*
2145                  * Notify obd_zombie_barrier callers that queues
2146                  * may be empty.
2147                  */
2148                 wake_up(&obd_zombie_waitq);
2149         }
2150
2151         complete(&obd_zombie_stop);
2152
2153         RETURN(0);
2154 }
2155
2156
2157 /**
2158  * start destroy zombie import/export thread
2159  */
2160 int obd_zombie_impexp_init(void)
2161 {
2162         struct task_struct *task;
2163
2164         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2165         if (IS_ERR(task))
2166                 RETURN(PTR_ERR(task));
2167
2168         wait_for_completion(&obd_zombie_start);
2169         RETURN(0);
2170 }
2171 /**
2172  * stop destroy zombie import/export thread
2173  */
2174 void obd_zombie_impexp_stop(void)
2175 {
2176         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2177         obd_zombie_impexp_notify();
2178         wait_for_completion(&obd_zombie_stop);
2179         LASSERT(list_empty(&obd_stale_exports));
2180 }
2181
2182 /***** Kernel-userspace comm helpers *******/
2183
2184 /* Get length of entire message, including header */
2185 int kuc_len(int payload_len)
2186 {
2187         return sizeof(struct kuc_hdr) + payload_len;
2188 }
2189 EXPORT_SYMBOL(kuc_len);
2190
2191 /* Get a pointer to kuc header, given a ptr to the payload
2192  * @param p Pointer to payload area
2193  * @returns Pointer to kuc header
2194  */
2195 struct kuc_hdr * kuc_ptr(void *p)
2196 {
2197         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2198         LASSERT(lh->kuc_magic == KUC_MAGIC);
2199         return lh;
2200 }
2201 EXPORT_SYMBOL(kuc_ptr);
2202
2203 /* Alloc space for a message, and fill in header
2204  * @return Pointer to payload area
2205  */
2206 void *kuc_alloc(int payload_len, int transport, int type)
2207 {
2208         struct kuc_hdr *lh;
2209         int len = kuc_len(payload_len);
2210
2211         OBD_ALLOC(lh, len);
2212         if (lh == NULL)
2213                 return ERR_PTR(-ENOMEM);
2214
2215         lh->kuc_magic = KUC_MAGIC;
2216         lh->kuc_transport = transport;
2217         lh->kuc_msgtype = type;
2218         lh->kuc_msglen = len;
2219
2220         return (void *)(lh + 1);
2221 }
2222 EXPORT_SYMBOL(kuc_alloc);
2223
2224 /* Takes pointer to payload area */
2225 void kuc_free(void *p, int payload_len)
2226 {
2227         struct kuc_hdr *lh = kuc_ptr(p);
2228         OBD_FREE(lh, kuc_len(payload_len));
2229 }
2230 EXPORT_SYMBOL(kuc_free);
2231
2232 struct obd_request_slot_waiter {
2233         struct list_head        orsw_entry;
2234         wait_queue_head_t       orsw_waitq;
2235         bool                    orsw_signaled;
2236 };
2237
2238 static bool obd_request_slot_avail(struct client_obd *cli,
2239                                    struct obd_request_slot_waiter *orsw)
2240 {
2241         bool avail;
2242
2243         spin_lock(&cli->cl_loi_list_lock);
2244         avail = !!list_empty(&orsw->orsw_entry);
2245         spin_unlock(&cli->cl_loi_list_lock);
2246
2247         return avail;
2248 };
2249
2250 /*
2251  * For network flow control, the RPC sponsor needs to acquire a credit
2252  * before sending the RPC. The credits count for a connection is defined
2253  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2254  * the subsequent RPC sponsors need to wait until others released their
2255  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2256  */
2257 int obd_get_request_slot(struct client_obd *cli)
2258 {
2259         struct obd_request_slot_waiter   orsw;
2260         struct l_wait_info               lwi;
2261         int                              rc;
2262
2263         spin_lock(&cli->cl_loi_list_lock);
2264         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2265                 cli->cl_rpcs_in_flight++;
2266                 spin_unlock(&cli->cl_loi_list_lock);
2267                 return 0;
2268         }
2269
2270         init_waitqueue_head(&orsw.orsw_waitq);
2271         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2272         orsw.orsw_signaled = false;
2273         spin_unlock(&cli->cl_loi_list_lock);
2274
2275         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2276         rc = l_wait_event(orsw.orsw_waitq,
2277                           obd_request_slot_avail(cli, &orsw) ||
2278                           orsw.orsw_signaled,
2279                           &lwi);
2280
2281         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2282          * freed but other (such as obd_put_request_slot) is using it. */
2283         spin_lock(&cli->cl_loi_list_lock);
2284         if (rc != 0) {
2285                 if (!orsw.orsw_signaled) {
2286                         if (list_empty(&orsw.orsw_entry))
2287                                 cli->cl_rpcs_in_flight--;
2288                         else
2289                                 list_del(&orsw.orsw_entry);
2290                 }
2291         }
2292
2293         if (orsw.orsw_signaled) {
2294                 LASSERT(list_empty(&orsw.orsw_entry));
2295
2296                 rc = -EINTR;
2297         }
2298         spin_unlock(&cli->cl_loi_list_lock);
2299
2300         return rc;
2301 }
2302 EXPORT_SYMBOL(obd_get_request_slot);
2303
2304 void obd_put_request_slot(struct client_obd *cli)
2305 {
2306         struct obd_request_slot_waiter *orsw;
2307
2308         spin_lock(&cli->cl_loi_list_lock);
2309         cli->cl_rpcs_in_flight--;
2310
2311         /* If there is free slot, wakeup the first waiter. */
2312         if (!list_empty(&cli->cl_flight_waiters) &&
2313             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2314                 orsw = list_entry(cli->cl_flight_waiters.next,
2315                                   struct obd_request_slot_waiter, orsw_entry);
2316                 list_del_init(&orsw->orsw_entry);
2317                 cli->cl_rpcs_in_flight++;
2318                 wake_up(&orsw->orsw_waitq);
2319         }
2320         spin_unlock(&cli->cl_loi_list_lock);
2321 }
2322 EXPORT_SYMBOL(obd_put_request_slot);
2323
2324 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2325 {
2326         return cli->cl_max_rpcs_in_flight;
2327 }
2328 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2329
2330 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2331 {
2332         struct obd_request_slot_waiter *orsw;
2333         __u32                           old;
2334         int                             diff;
2335         int                             i;
2336         char                            *typ_name;
2337         int                             rc;
2338
2339         if (max > OBD_MAX_RIF_MAX || max < 1)
2340                 return -ERANGE;
2341
2342         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2343         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2344                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2345                  * strictly lower that max_rpcs_in_flight */
2346                 if (max < 2) {
2347                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2348                                "because it must be higher than "
2349                                "max_mod_rpcs_in_flight value",
2350                                cli->cl_import->imp_obd->obd_name);
2351                         return -ERANGE;
2352                 }
2353                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2354                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2355                         if (rc != 0)
2356                                 return rc;
2357                 }
2358         }
2359
2360         spin_lock(&cli->cl_loi_list_lock);
2361         old = cli->cl_max_rpcs_in_flight;
2362         cli->cl_max_rpcs_in_flight = max;
2363         client_adjust_max_dirty(cli);
2364
2365         diff = max - old;
2366
2367         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2368         for (i = 0; i < diff; i++) {
2369                 if (list_empty(&cli->cl_flight_waiters))
2370                         break;
2371
2372                 orsw = list_entry(cli->cl_flight_waiters.next,
2373                                   struct obd_request_slot_waiter, orsw_entry);
2374                 list_del_init(&orsw->orsw_entry);
2375                 cli->cl_rpcs_in_flight++;
2376                 wake_up(&orsw->orsw_waitq);
2377         }
2378         spin_unlock(&cli->cl_loi_list_lock);
2379
2380         return 0;
2381 }
2382 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2383
2384 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2385 {
2386         return cli->cl_max_mod_rpcs_in_flight;
2387 }
2388 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2389
2390 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2391 {
2392         struct obd_connect_data *ocd;
2393         __u16 maxmodrpcs;
2394         __u16 prev;
2395
2396         if (max > OBD_MAX_RIF_MAX || max < 1)
2397                 return -ERANGE;
2398
2399         /* cannot exceed or equal max_rpcs_in_flight */
2400         if (max >= cli->cl_max_rpcs_in_flight) {
2401                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2402                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2403                        cli->cl_import->imp_obd->obd_name,
2404                        max, cli->cl_max_rpcs_in_flight);
2405                 return -ERANGE;
2406         }
2407
2408         /* cannot exceed max modify RPCs in flight supported by the server */
2409         ocd = &cli->cl_import->imp_connect_data;
2410         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2411                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2412         else
2413                 maxmodrpcs = 1;
2414         if (max > maxmodrpcs) {
2415                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2416                        "higher than max_mod_rpcs_per_client value (%hu) "
2417                        "returned by the server at connection\n",
2418                        cli->cl_import->imp_obd->obd_name,
2419                        max, maxmodrpcs);
2420                 return -ERANGE;
2421         }
2422
2423         spin_lock(&cli->cl_mod_rpcs_lock);
2424
2425         prev = cli->cl_max_mod_rpcs_in_flight;
2426         cli->cl_max_mod_rpcs_in_flight = max;
2427
2428         /* wakeup waiters if limit has been increased */
2429         if (cli->cl_max_mod_rpcs_in_flight > prev)
2430                 wake_up(&cli->cl_mod_rpcs_waitq);
2431
2432         spin_unlock(&cli->cl_mod_rpcs_lock);
2433
2434         return 0;
2435 }
2436 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2437
2438
2439 #define pct(a, b) (b ? a * 100 / b : 0)
2440 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2441                                struct seq_file *seq)
2442 {
2443         unsigned long mod_tot = 0, mod_cum;
2444         struct timespec64 now;
2445         int i;
2446
2447         ktime_get_real_ts64(&now);
2448
2449         spin_lock(&cli->cl_mod_rpcs_lock);
2450
2451         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2452                    (s64)now.tv_sec, now.tv_nsec);
2453         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2454                    cli->cl_mod_rpcs_in_flight);
2455
2456         seq_printf(seq, "\n\t\t\tmodify\n");
2457         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2458
2459         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2460
2461         mod_cum = 0;
2462         for (i = 0; i < OBD_HIST_MAX; i++) {
2463                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2464                 mod_cum += mod;
2465                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2466                            i, mod, pct(mod, mod_tot),
2467                            pct(mod_cum, mod_tot));
2468                 if (mod_cum == mod_tot)
2469                         break;
2470         }
2471
2472         spin_unlock(&cli->cl_mod_rpcs_lock);
2473
2474         return 0;
2475 }
2476 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2477 #undef pct
2478
2479
2480 /* The number of modify RPCs sent in parallel is limited
2481  * because the server has a finite number of slots per client to
2482  * store request result and ensure reply reconstruction when needed.
2483  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2484  * that takes into account server limit and cl_max_rpcs_in_flight
2485  * value.
2486  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2487  * one close request is allowed above the maximum.
2488  */
2489 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2490                                                  bool close_req)
2491 {
2492         bool avail;
2493
2494         /* A slot is available if
2495          * - number of modify RPCs in flight is less than the max
2496          * - it's a close RPC and no other close request is in flight
2497          */
2498         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2499                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2500
2501         return avail;
2502 }
2503
2504 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2505                                          bool close_req)
2506 {
2507         bool avail;
2508
2509         spin_lock(&cli->cl_mod_rpcs_lock);
2510         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2511         spin_unlock(&cli->cl_mod_rpcs_lock);
2512         return avail;
2513 }
2514
2515 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2516 {
2517         if (it != NULL &&
2518             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2519              it->it_op == IT_READDIR ||
2520              (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2521                         return true;
2522         return false;
2523 }
2524
2525 /* Get a modify RPC slot from the obd client @cli according
2526  * to the kind of operation @opc that is going to be sent
2527  * and the intent @it of the operation if it applies.
2528  * If the maximum number of modify RPCs in flight is reached
2529  * the thread is put to sleep.
2530  * Returns the tag to be set in the request message. Tag 0
2531  * is reserved for non-modifying requests.
2532  */
2533 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2534                            struct lookup_intent *it)
2535 {
2536         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2537         bool                    close_req = false;
2538         __u16                   i, max;
2539
2540         /* read-only metadata RPCs don't consume a slot on MDT
2541          * for reply reconstruction
2542          */
2543         if (obd_skip_mod_rpc_slot(it))
2544                 return 0;
2545
2546         if (opc == MDS_CLOSE)
2547                 close_req = true;
2548
2549         do {
2550                 spin_lock(&cli->cl_mod_rpcs_lock);
2551                 max = cli->cl_max_mod_rpcs_in_flight;
2552                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2553                         /* there is a slot available */
2554                         cli->cl_mod_rpcs_in_flight++;
2555                         if (close_req)
2556                                 cli->cl_close_rpcs_in_flight++;
2557                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2558                                          cli->cl_mod_rpcs_in_flight);
2559                         /* find a free tag */
2560                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2561                                                 max + 1);
2562                         LASSERT(i < OBD_MAX_RIF_MAX);
2563                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2564                         spin_unlock(&cli->cl_mod_rpcs_lock);
2565                         /* tag 0 is reserved for non-modify RPCs */
2566                         return i + 1;
2567                 }
2568                 spin_unlock(&cli->cl_mod_rpcs_lock);
2569
2570                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2571                        "opc %u, max %hu\n",
2572                        cli->cl_import->imp_obd->obd_name, opc, max);
2573
2574                 l_wait_event(cli->cl_mod_rpcs_waitq,
2575                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2576         } while (true);
2577 }
2578 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2579
2580 /* Put a modify RPC slot from the obd client @cli according
2581  * to the kind of operation @opc that has been sent and the
2582  * intent @it of the operation if it applies.
2583  */
2584 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2585                           struct lookup_intent *it, __u16 tag)
2586 {
2587         bool                    close_req = false;
2588
2589         if (obd_skip_mod_rpc_slot(it))
2590                 return;
2591
2592         if (opc == MDS_CLOSE)
2593                 close_req = true;
2594
2595         spin_lock(&cli->cl_mod_rpcs_lock);
2596         cli->cl_mod_rpcs_in_flight--;
2597         if (close_req)
2598                 cli->cl_close_rpcs_in_flight--;
2599         /* release the tag in the bitmap */
2600         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2601         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2602         spin_unlock(&cli->cl_mod_rpcs_lock);
2603         wake_up(&cli->cl_mod_rpcs_waitq);
2604 }
2605 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2606