Whamcloud - gitweb
LU-10467 lustre: use l_wait_event_abortable where appropriate.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 if (try_module_get(type->typ_dt_ops->o_owner)) {
140                         atomic_inc(&type->typ_refcnt);
141                         /* class_search_type() returned a counted reference,
142                          * but we don't need that count any more as
143                          * we have one through typ_refcnt.
144                          */
145                         kobject_put(&type->typ_kobj);
146                 } else {
147                         kobject_put(&type->typ_kobj);
148                         type = NULL;
149                 }
150         }
151         return type;
152 }
153
154 void class_put_type(struct obd_type *type)
155 {
156         LASSERT(type);
157         module_put(type->typ_dt_ops->o_owner);
158         atomic_dec(&type->typ_refcnt);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         OBD_FREE(type, sizeof(*type));
176 }
177
178 static struct kobj_type class_ktype = {
179         .sysfs_ops      = &lustre_sysfs_ops,
180         .release        = class_sysfs_release,
181 };
182
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
185 {
186         struct dentry *symlink;
187         struct obd_type *type;
188         int rc;
189
190         type = class_search_type(name);
191         if (type) {
192                 kobject_put(&type->typ_kobj);
193                 return ERR_PTR(-EEXIST);
194         }
195
196         OBD_ALLOC(type, sizeof(*type));
197         if (!type)
198                 return ERR_PTR(-ENOMEM);
199
200         type->typ_kobj.kset = lustre_kset;
201         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202                                   &lustre_kset->kobj, "%s", name);
203         if (rc)
204                 return ERR_PTR(rc);
205
206         symlink = debugfs_create_dir(name, debugfs_lustre_root);
207         if (IS_ERR_OR_NULL(symlink)) {
208                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209                 kobject_put(&type->typ_kobj);
210                 return ERR_PTR(rc);
211         }
212         type->typ_debugfs_entry = symlink;
213         type->typ_sym_filter = true;
214
215         if (enable_proc) {
216                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
217                                                       NULL, NULL);
218                 if (IS_ERR(type->typ_procroot)) {
219                         CERROR("%s: can't create compat proc entry: %d\n",
220                                name, (int)PTR_ERR(type->typ_procroot));
221                         type->typ_procroot = NULL;
222                 }
223         }
224
225         return type;
226 }
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
229
230 #define CLASS_MAX_NAME 1024
231
232 int class_register_type(const struct obd_ops *dt_ops,
233                         const struct md_ops *md_ops,
234                         bool enable_proc, struct lprocfs_vars *vars,
235                         const char *name, struct lu_device_type *ldt)
236 {
237         struct obd_type *type;
238         int rc;
239
240         ENTRY;
241         /* sanity check */
242         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
243
244         type = class_search_type(name);
245         if (type) {
246 #ifdef HAVE_SERVER_SUPPORT
247                 if (type->typ_sym_filter)
248                         goto dir_exist;
249 #endif /* HAVE_SERVER_SUPPORT */
250                 kobject_put(&type->typ_kobj);
251                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
252                 RETURN(-EEXIST);
253         }
254
255         OBD_ALLOC(type, sizeof(*type));
256         if (type == NULL)
257                 RETURN(-ENOMEM);
258
259         type->typ_kobj.kset = lustre_kset;
260         kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
262 dir_exist:
263 #endif /* HAVE_SERVER_SUPPORT */
264
265         type->typ_dt_ops = dt_ops;
266         type->typ_md_ops = md_ops;
267
268 #ifdef HAVE_SERVER_SUPPORT
269         if (type->typ_sym_filter) {
270                 type->typ_sym_filter = false;
271                 kobject_put(&type->typ_kobj);
272                 goto setup_ldt;
273         }
274 #endif
275 #ifdef CONFIG_PROC_FS
276         if (enable_proc && !type->typ_procroot) {
277                 type->typ_procroot = lprocfs_register(name,
278                                                       proc_lustre_root,
279                                                       NULL, type);
280                 if (IS_ERR(type->typ_procroot)) {
281                         rc = PTR_ERR(type->typ_procroot);
282                         type->typ_procroot = NULL;
283                         GOTO(failed, rc);
284                 }
285         }
286 #endif
287         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
288                                                     vars, type);
289         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
291                                              : -ENOMEM;
292                 type->typ_debugfs_entry = NULL;
293                 GOTO(failed, rc);
294         }
295
296         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
297         if (rc)
298                 GOTO(failed, rc);
299 #ifdef HAVE_SERVER_SUPPORT
300 setup_ldt:
301 #endif
302         if (ldt) {
303                 type->typ_lu = ldt;
304                 rc = lu_device_type_init(ldt);
305                 if (rc)
306                         GOTO(failed, rc);
307         }
308
309         RETURN(0);
310
311 failed:
312         kobject_put(&type->typ_kobj);
313
314         RETURN(rc);
315 }
316 EXPORT_SYMBOL(class_register_type);
317
318 int class_unregister_type(const char *name)
319 {
320         struct obd_type *type = class_search_type(name);
321         int rc = 0;
322         ENTRY;
323
324         if (!type) {
325                 CERROR("unknown obd type\n");
326                 RETURN(-EINVAL);
327         }
328
329         if (atomic_read(&type->typ_refcnt)) {
330                 CERROR("type %s has refcount (%d)\n", name,
331                        atomic_read(&type->typ_refcnt));
332                 /* This is a bad situation, let's make the best of it */
333                 /* Remove ops, but leave the name for debugging */
334                 type->typ_dt_ops = NULL;
335                 type->typ_md_ops = NULL;
336                 GOTO(out_put, rc = -EBUSY);
337         }
338
339         /* Put the final ref */
340         kobject_put(&type->typ_kobj);
341 out_put:
342         /* Put the ref returned by class_search_type() */
343         kobject_put(&type->typ_kobj);
344
345         RETURN(rc);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
348
349 /**
350  * Create a new obd device.
351  *
352  * Allocate the new obd_device and initialize it.
353  *
354  * \param[in] type_name obd device type string.
355  * \param[in] name      obd device name.
356  * \param[in] uuid      obd device UUID
357  *
358  * \retval newdev         pointer to created obd_device
359  * \retval ERR_PTR(errno) on error
360  */
361 struct obd_device *class_newdev(const char *type_name, const char *name,
362                                 const char *uuid)
363 {
364         struct obd_device *newdev;
365         struct obd_type *type = NULL;
366         ENTRY;
367
368         if (strlen(name) >= MAX_OBD_NAME) {
369                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370                 RETURN(ERR_PTR(-EINVAL));
371         }
372
373         type = class_get_type(type_name);
374         if (type == NULL){
375                 CERROR("OBD: unknown type: %s\n", type_name);
376                 RETURN(ERR_PTR(-ENODEV));
377         }
378
379         newdev = obd_device_alloc();
380         if (newdev == NULL) {
381                 class_put_type(type);
382                 RETURN(ERR_PTR(-ENOMEM));
383         }
384         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386         newdev->obd_type = type;
387         newdev->obd_minor = -1;
388
389         rwlock_init(&newdev->obd_pool_lock);
390         newdev->obd_pool_limit = 0;
391         newdev->obd_pool_slv = 0;
392
393         INIT_LIST_HEAD(&newdev->obd_exports);
394         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396         INIT_LIST_HEAD(&newdev->obd_exports_timed);
397         INIT_LIST_HEAD(&newdev->obd_nid_stats);
398         spin_lock_init(&newdev->obd_nid_lock);
399         spin_lock_init(&newdev->obd_dev_lock);
400         mutex_init(&newdev->obd_dev_mutex);
401         spin_lock_init(&newdev->obd_osfs_lock);
402         /* newdev->obd_osfs_age must be set to a value in the distant
403          * past to guarantee a fresh statfs is fetched on mount. */
404         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
405
406         /* XXX belongs in setup not attach  */
407         init_rwsem(&newdev->obd_observer_link_sem);
408         /* recovery data */
409         spin_lock_init(&newdev->obd_recovery_task_lock);
410         init_waitqueue_head(&newdev->obd_next_transno_waitq);
411         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415         INIT_LIST_HEAD(&newdev->obd_evict_list);
416         INIT_LIST_HEAD(&newdev->obd_lwp_list);
417
418         llog_group_init(&newdev->obd_olg);
419         /* Detach drops this */
420         atomic_set(&newdev->obd_refcount, 1);
421         lu_ref_init(&newdev->obd_reference);
422         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
423
424         newdev->obd_conn_inprogress = 0;
425
426         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
427
428         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429                newdev->obd_name, newdev);
430
431         return newdev;
432 }
433
434 /**
435  * Free obd device.
436  *
437  * \param[in] obd obd_device to be freed
438  *
439  * \retval none
440  */
441 void class_free_dev(struct obd_device *obd)
442 {
443         struct obd_type *obd_type = obd->obd_type;
444
445         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448                  "obd %p != obd_devs[%d] %p\n",
449                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451                  "obd_refcount should be 0, not %d\n",
452                  atomic_read(&obd->obd_refcount));
453         LASSERT(obd_type != NULL);
454
455         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456                obd->obd_name, obd->obd_type->typ_name);
457
458         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459                          obd->obd_name, obd->obd_uuid.uuid);
460         if (obd->obd_stopping) {
461                 int err;
462
463                 /* If we're not stopping, we were never set up */
464                 err = obd_cleanup(obd);
465                 if (err)
466                         CERROR("Cleanup %s returned %d\n",
467                                 obd->obd_name, err);
468         }
469
470         obd_device_free(obd);
471
472         class_put_type(obd_type);
473 }
474
475 /**
476  * Unregister obd device.
477  *
478  * Free slot in obd_dev[] used by \a obd.
479  *
480  * \param[in] new_obd obd_device to be unregistered
481  *
482  * \retval none
483  */
484 void class_unregister_device(struct obd_device *obd)
485 {
486         write_lock(&obd_dev_lock);
487         if (obd->obd_minor >= 0) {
488                 LASSERT(obd_devs[obd->obd_minor] == obd);
489                 obd_devs[obd->obd_minor] = NULL;
490                 obd->obd_minor = -1;
491         }
492         write_unlock(&obd_dev_lock);
493 }
494
495 /**
496  * Register obd device.
497  *
498  * Find free slot in obd_devs[], fills it with \a new_obd.
499  *
500  * \param[in] new_obd obd_device to be registered
501  *
502  * \retval 0          success
503  * \retval -EEXIST    device with this name is registered
504  * \retval -EOVERFLOW obd_devs[] is full
505  */
506 int class_register_device(struct obd_device *new_obd)
507 {
508         int ret = 0;
509         int i;
510         int new_obd_minor = 0;
511         bool minor_assign = false;
512         bool retried = false;
513
514 again:
515         write_lock(&obd_dev_lock);
516         for (i = 0; i < class_devno_max(); i++) {
517                 struct obd_device *obd = class_num2obd(i);
518
519                 if (obd != NULL &&
520                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
521
522                         if (!retried) {
523                                 write_unlock(&obd_dev_lock);
524
525                                 /* the obd_device could be waited to be
526                                  * destroyed by the "obd_zombie_impexp_thread".
527                                  */
528                                 obd_zombie_barrier();
529                                 retried = true;
530                                 goto again;
531                         }
532
533                         CERROR("%s: already exists, won't add\n",
534                                obd->obd_name);
535                         /* in case we found a free slot before duplicate */
536                         minor_assign = false;
537                         ret = -EEXIST;
538                         break;
539                 }
540                 if (!minor_assign && obd == NULL) {
541                         new_obd_minor = i;
542                         minor_assign = true;
543                 }
544         }
545
546         if (minor_assign) {
547                 new_obd->obd_minor = new_obd_minor;
548                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550                 obd_devs[new_obd_minor] = new_obd;
551         } else {
552                 if (ret == 0) {
553                         ret = -EOVERFLOW;
554                         CERROR("%s: all %u/%u devices used, increase "
555                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556                                i, class_devno_max(), ret);
557                 }
558         }
559         write_unlock(&obd_dev_lock);
560
561         RETURN(ret);
562 }
563
564 static int class_name2dev_nolock(const char *name)
565 {
566         int i;
567
568         if (!name)
569                 return -1;
570
571         for (i = 0; i < class_devno_max(); i++) {
572                 struct obd_device *obd = class_num2obd(i);
573
574                 if (obd && strcmp(name, obd->obd_name) == 0) {
575                         /* Make sure we finished attaching before we give
576                            out any references */
577                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578                         if (obd->obd_attached) {
579                                 return i;
580                         }
581                         break;
582                 }
583         }
584
585         return -1;
586 }
587
588 int class_name2dev(const char *name)
589 {
590         int i;
591
592         if (!name)
593                 return -1;
594
595         read_lock(&obd_dev_lock);
596         i = class_name2dev_nolock(name);
597         read_unlock(&obd_dev_lock);
598
599         return i;
600 }
601 EXPORT_SYMBOL(class_name2dev);
602
603 struct obd_device *class_name2obd(const char *name)
604 {
605         int dev = class_name2dev(name);
606
607         if (dev < 0 || dev > class_devno_max())
608                 return NULL;
609         return class_num2obd(dev);
610 }
611 EXPORT_SYMBOL(class_name2obd);
612
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
614 {
615         int i;
616
617         for (i = 0; i < class_devno_max(); i++) {
618                 struct obd_device *obd = class_num2obd(i);
619
620                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
622                         return i;
623                 }
624         }
625
626         return -1;
627 }
628
629 int class_uuid2dev(struct obd_uuid *uuid)
630 {
631         int i;
632
633         read_lock(&obd_dev_lock);
634         i = class_uuid2dev_nolock(uuid);
635         read_unlock(&obd_dev_lock);
636
637         return i;
638 }
639 EXPORT_SYMBOL(class_uuid2dev);
640
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
642 {
643         int dev = class_uuid2dev(uuid);
644         if (dev < 0)
645                 return NULL;
646         return class_num2obd(dev);
647 }
648 EXPORT_SYMBOL(class_uuid2obd);
649
650 /**
651  * Get obd device from ::obd_devs[]
652  *
653  * \param num [in] array index
654  *
655  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656  *         otherwise return the obd device there.
657  */
658 struct obd_device *class_num2obd(int num)
659 {
660         struct obd_device *obd = NULL;
661
662         if (num < class_devno_max()) {
663                 obd = obd_devs[num];
664                 if (obd == NULL)
665                         return NULL;
666
667                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668                          "%p obd_magic %08x != %08x\n",
669                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670                 LASSERTF(obd->obd_minor == num,
671                          "%p obd_minor %0d != %0d\n",
672                          obd, obd->obd_minor, num);
673         }
674
675         return obd;
676 }
677
678 /**
679  * Find obd in obd_dev[] by name or uuid.
680  *
681  * Increment obd's refcount if found.
682  *
683  * \param[in] str obd name or uuid
684  *
685  * \retval NULL    if not found
686  * \retval target  pointer to found obd_device
687  */
688 struct obd_device *class_dev_by_str(const char *str)
689 {
690         struct obd_device *target = NULL;
691         struct obd_uuid tgtuuid;
692         int rc;
693
694         obd_str2uuid(&tgtuuid, str);
695
696         read_lock(&obd_dev_lock);
697         rc = class_uuid2dev_nolock(&tgtuuid);
698         if (rc < 0)
699                 rc = class_name2dev_nolock(str);
700
701         if (rc >= 0)
702                 target = class_num2obd(rc);
703
704         if (target != NULL)
705                 class_incref(target, "find", current);
706         read_unlock(&obd_dev_lock);
707
708         RETURN(target);
709 }
710 EXPORT_SYMBOL(class_dev_by_str);
711
712 /**
713  * Get obd devices count. Device in any
714  *    state are counted
715  * \retval obd device count
716  */
717 int get_devices_count(void)
718 {
719         int index, max_index = class_devno_max(), dev_count = 0;
720
721         read_lock(&obd_dev_lock);
722         for (index = 0; index <= max_index; index++) {
723                 struct obd_device *obd = class_num2obd(index);
724                 if (obd != NULL)
725                         dev_count++;
726         }
727         read_unlock(&obd_dev_lock);
728
729         return dev_count;
730 }
731 EXPORT_SYMBOL(get_devices_count);
732
733 void class_obd_list(void)
734 {
735         char *status;
736         int i;
737
738         read_lock(&obd_dev_lock);
739         for (i = 0; i < class_devno_max(); i++) {
740                 struct obd_device *obd = class_num2obd(i);
741
742                 if (obd == NULL)
743                         continue;
744                 if (obd->obd_stopping)
745                         status = "ST";
746                 else if (obd->obd_set_up)
747                         status = "UP";
748                 else if (obd->obd_attached)
749                         status = "AT";
750                 else
751                         status = "--";
752                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753                          i, status, obd->obd_type->typ_name,
754                          obd->obd_name, obd->obd_uuid.uuid,
755                          atomic_read(&obd->obd_refcount));
756         }
757         read_unlock(&obd_dev_lock);
758 }
759
760 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
761    specified, then only the client with that uuid is returned,
762    otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764                                           const char *type_name,
765                                           struct obd_uuid *grp_uuid)
766 {
767         int i;
768
769         read_lock(&obd_dev_lock);
770         for (i = 0; i < class_devno_max(); i++) {
771                 struct obd_device *obd = class_num2obd(i);
772
773                 if (obd == NULL)
774                         continue;
775                 if ((strncmp(obd->obd_type->typ_name, type_name,
776                              strlen(type_name)) == 0)) {
777                         if (obd_uuid_equals(tgt_uuid,
778                                             &obd->u.cli.cl_target_uuid) &&
779                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
780                                                          &obd->obd_uuid) : 1)) {
781                                 read_unlock(&obd_dev_lock);
782                                 return obd;
783                         }
784                 }
785         }
786         read_unlock(&obd_dev_lock);
787
788         return NULL;
789 }
790 EXPORT_SYMBOL(class_find_client_obd);
791
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793    searching at *next, and if a device is found, the next index to look
794    at is saved in *next. If next is NULL, then the first matching device
795    will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
797 {
798         int i;
799
800         if (next == NULL)
801                 i = 0;
802         else if (*next >= 0 && *next < class_devno_max())
803                 i = *next;
804         else
805                 return NULL;
806
807         read_lock(&obd_dev_lock);
808         for (; i < class_devno_max(); i++) {
809                 struct obd_device *obd = class_num2obd(i);
810
811                 if (obd == NULL)
812                         continue;
813                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
814                         if (next != NULL)
815                                 *next = i+1;
816                         read_unlock(&obd_dev_lock);
817                         return obd;
818                 }
819         }
820         read_unlock(&obd_dev_lock);
821
822         return NULL;
823 }
824 EXPORT_SYMBOL(class_devices_in_group);
825
826 /**
827  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828  * adjust sptlrpc settings accordingly.
829  */
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
831 {
832         struct obd_device  *obd;
833         const char         *type;
834         int                 i, rc = 0, rc2;
835
836         LASSERT(namelen > 0);
837
838         read_lock(&obd_dev_lock);
839         for (i = 0; i < class_devno_max(); i++) {
840                 obd = class_num2obd(i);
841
842                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
843                         continue;
844
845                 /* only notify mdc, osc, osp, lwp, mdt, ost
846                  * because only these have a -sptlrpc llog */
847                 type = obd->obd_type->typ_name;
848                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853                     strcmp(type, LUSTRE_OST_NAME) != 0)
854                         continue;
855
856                 if (strncmp(obd->obd_name, fsname, namelen))
857                         continue;
858
859                 class_incref(obd, __FUNCTION__, obd);
860                 read_unlock(&obd_dev_lock);
861                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862                                          sizeof(KEY_SPTLRPC_CONF),
863                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
864                 rc = rc ? rc : rc2;
865                 class_decref(obd, __FUNCTION__, obd);
866                 read_lock(&obd_dev_lock);
867         }
868         read_unlock(&obd_dev_lock);
869         return rc;
870 }
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
872
873 void obd_cleanup_caches(void)
874 {
875         ENTRY;
876         if (obd_device_cachep) {
877                 kmem_cache_destroy(obd_device_cachep);
878                 obd_device_cachep = NULL;
879         }
880
881         EXIT;
882 }
883
884 int obd_init_caches(void)
885 {
886         int rc;
887         ENTRY;
888
889         LASSERT(obd_device_cachep == NULL);
890         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891                                 sizeof(struct obd_device),
892                                 0, 0, 0, sizeof(struct obd_device), NULL);
893         if (!obd_device_cachep)
894                 GOTO(out, rc = -ENOMEM);
895
896         RETURN(0);
897 out:
898         obd_cleanup_caches();
899         RETURN(rc);
900 }
901
902 static const char export_handle_owner[] = "export";
903
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
906 {
907         struct obd_export *export;
908         ENTRY;
909
910         if (!conn) {
911                 CDEBUG(D_CACHE, "looking for null handle\n");
912                 RETURN(NULL);
913         }
914
915         if (conn->cookie == -1) {  /* this means assign a new connection */
916                 CDEBUG(D_CACHE, "want a new connection\n");
917                 RETURN(NULL);
918         }
919
920         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921         export = class_handle2object(conn->cookie, export_handle_owner);
922         RETURN(export);
923 }
924 EXPORT_SYMBOL(class_conn2export);
925
926 struct obd_device *class_exp2obd(struct obd_export *exp)
927 {
928         if (exp)
929                 return exp->exp_obd;
930         return NULL;
931 }
932 EXPORT_SYMBOL(class_exp2obd);
933
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
935 {
936         struct obd_device *obd = exp->exp_obd;
937         if (obd == NULL)
938                 return NULL;
939         return obd->u.cli.cl_import;
940 }
941 EXPORT_SYMBOL(class_exp2cliimp);
942
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
945 {
946         struct obd_device *obd = exp->exp_obd;
947         ENTRY;
948
949         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950         LASSERT(obd != NULL);
951
952         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953                exp->exp_client_uuid.uuid, obd->obd_name);
954
955         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956         if (exp->exp_connection)
957                 ptlrpc_put_connection_superhack(exp->exp_connection);
958
959         LASSERT(list_empty(&exp->exp_outstanding_replies));
960         LASSERT(list_empty(&exp->exp_uncommitted_replies));
961         LASSERT(list_empty(&exp->exp_req_replay_queue));
962         LASSERT(list_empty(&exp->exp_hp_rpcs));
963         obd_destroy_export(exp);
964         /* self export doesn't hold a reference to an obd, although it
965          * exists until freeing of the obd */
966         if (exp != obd->obd_self_export)
967                 class_decref(obd, "export", exp);
968
969         OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970         kfree_rcu(exp, exp_handle.h_rcu);
971         EXIT;
972 }
973
974 struct obd_export *class_export_get(struct obd_export *exp)
975 {
976         refcount_inc(&exp->exp_handle.h_ref);
977         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
978                refcount_read(&exp->exp_handle.h_ref));
979         return exp;
980 }
981 EXPORT_SYMBOL(class_export_get);
982
983 void class_export_put(struct obd_export *exp)
984 {
985         LASSERT(exp != NULL);
986         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
987         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
988         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
989                refcount_read(&exp->exp_handle.h_ref) - 1);
990
991         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
992                 struct obd_device *obd = exp->exp_obd;
993
994                 CDEBUG(D_IOCTL, "final put %p/%s\n",
995                        exp, exp->exp_client_uuid.uuid);
996
997                 /* release nid stat refererence */
998                 lprocfs_exp_cleanup(exp);
999
1000                 if (exp == obd->obd_self_export) {
1001                         /* self export should be destroyed without
1002                          * zombie thread as it doesn't hold a
1003                          * reference to obd and doesn't hold any
1004                          * resources */
1005                         class_export_destroy(exp);
1006                         /* self export is destroyed, no class
1007                          * references exist and it is safe to free
1008                          * obd */
1009                         class_free_dev(obd);
1010                 } else {
1011                         LASSERT(!list_empty(&exp->exp_obd_chain));
1012                         obd_zombie_export_add(exp);
1013                 }
1014
1015         }
1016 }
1017 EXPORT_SYMBOL(class_export_put);
1018
1019 static void obd_zombie_exp_cull(struct work_struct *ws)
1020 {
1021         struct obd_export *export;
1022
1023         export = container_of(ws, struct obd_export, exp_zombie_work);
1024         class_export_destroy(export);
1025 }
1026
1027 /* Creates a new export, adds it to the hash table, and returns a
1028  * pointer to it. The refcount is 2: one for the hash reference, and
1029  * one for the pointer returned by this function. */
1030 struct obd_export *__class_new_export(struct obd_device *obd,
1031                                       struct obd_uuid *cluuid, bool is_self)
1032 {
1033         struct obd_export *export;
1034         int rc = 0;
1035         ENTRY;
1036
1037         OBD_ALLOC_PTR(export);
1038         if (!export)
1039                 return ERR_PTR(-ENOMEM);
1040
1041         export->exp_conn_cnt = 0;
1042         export->exp_lock_hash = NULL;
1043         export->exp_flock_hash = NULL;
1044         /* 2 = class_handle_hash + last */
1045         refcount_set(&export->exp_handle.h_ref, 2);
1046         atomic_set(&export->exp_rpc_count, 0);
1047         atomic_set(&export->exp_cb_count, 0);
1048         atomic_set(&export->exp_locks_count, 0);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050         INIT_LIST_HEAD(&export->exp_locks_list);
1051         spin_lock_init(&export->exp_locks_list_guard);
1052 #endif
1053         atomic_set(&export->exp_replay_count, 0);
1054         export->exp_obd = obd;
1055         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1056         spin_lock_init(&export->exp_uncommitted_replies_lock);
1057         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1058         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1059         INIT_HLIST_NODE(&export->exp_handle.h_link);
1060         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1061         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1062         class_handle_hash(&export->exp_handle, export_handle_owner);
1063         export->exp_last_request_time = ktime_get_real_seconds();
1064         spin_lock_init(&export->exp_lock);
1065         spin_lock_init(&export->exp_rpc_lock);
1066         INIT_HLIST_NODE(&export->exp_nid_hash);
1067         INIT_HLIST_NODE(&export->exp_gen_hash);
1068         spin_lock_init(&export->exp_bl_list_lock);
1069         INIT_LIST_HEAD(&export->exp_bl_list);
1070         INIT_LIST_HEAD(&export->exp_stale_list);
1071         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1072
1073         export->exp_sp_peer = LUSTRE_SP_ANY;
1074         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1075         export->exp_client_uuid = *cluuid;
1076         obd_init_export(export);
1077
1078         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1079
1080         spin_lock(&obd->obd_dev_lock);
1081         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1082                 /* shouldn't happen, but might race */
1083                 if (obd->obd_stopping)
1084                         GOTO(exit_unlock, rc = -ENODEV);
1085
1086                 rc = obd_uuid_add(obd, export);
1087                 if (rc != 0) {
1088                         LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1089                                       obd->obd_name, cluuid->uuid, rc);
1090                         GOTO(exit_unlock, rc = -EALREADY);
1091                 }
1092         }
1093
1094         if (!is_self) {
1095                 class_incref(obd, "export", export);
1096                 list_add_tail(&export->exp_obd_chain_timed,
1097                               &obd->obd_exports_timed);
1098                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1099                 obd->obd_num_exports++;
1100         } else {
1101                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1102                 INIT_LIST_HEAD(&export->exp_obd_chain);
1103         }
1104         spin_unlock(&obd->obd_dev_lock);
1105         RETURN(export);
1106
1107 exit_unlock:
1108         spin_unlock(&obd->obd_dev_lock);
1109         class_handle_unhash(&export->exp_handle);
1110         obd_destroy_export(export);
1111         OBD_FREE_PTR(export);
1112         return ERR_PTR(rc);
1113 }
1114
1115 struct obd_export *class_new_export(struct obd_device *obd,
1116                                     struct obd_uuid *uuid)
1117 {
1118         return __class_new_export(obd, uuid, false);
1119 }
1120 EXPORT_SYMBOL(class_new_export);
1121
1122 struct obd_export *class_new_export_self(struct obd_device *obd,
1123                                          struct obd_uuid *uuid)
1124 {
1125         return __class_new_export(obd, uuid, true);
1126 }
1127
1128 void class_unlink_export(struct obd_export *exp)
1129 {
1130         class_handle_unhash(&exp->exp_handle);
1131
1132         if (exp->exp_obd->obd_self_export == exp) {
1133                 class_export_put(exp);
1134                 return;
1135         }
1136
1137         spin_lock(&exp->exp_obd->obd_dev_lock);
1138         /* delete an uuid-export hashitem from hashtables */
1139         if (exp != exp->exp_obd->obd_self_export)
1140                 obd_uuid_del(exp->exp_obd, exp);
1141
1142 #ifdef HAVE_SERVER_SUPPORT
1143         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144                 struct tg_export_data   *ted = &exp->exp_target_data;
1145                 struct cfs_hash         *hash;
1146
1147                 /* Because obd_gen_hash will not be released until
1148                  * class_cleanup(), so hash should never be NULL here */
1149                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150                 LASSERT(hash != NULL);
1151                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152                              &exp->exp_gen_hash);
1153                 cfs_hash_putref(hash);
1154         }
1155 #endif /* HAVE_SERVER_SUPPORT */
1156
1157         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158         list_del_init(&exp->exp_obd_chain_timed);
1159         exp->exp_obd->obd_num_exports--;
1160         spin_unlock(&exp->exp_obd->obd_dev_lock);
1161         atomic_inc(&obd_stale_export_num);
1162
1163         /* A reference is kept by obd_stale_exports list */
1164         obd_stale_export_put(exp);
1165 }
1166 EXPORT_SYMBOL(class_unlink_export);
1167
1168 /* Import management functions */
1169 static void obd_zombie_import_free(struct obd_import *imp)
1170 {
1171         ENTRY;
1172
1173         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174                 imp->imp_obd->obd_name);
1175
1176         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1177
1178         ptlrpc_put_connection_superhack(imp->imp_connection);
1179
1180         while (!list_empty(&imp->imp_conn_list)) {
1181                 struct obd_import_conn *imp_conn;
1182
1183                 imp_conn = list_entry(imp->imp_conn_list.next,
1184                                       struct obd_import_conn, oic_item);
1185                 list_del_init(&imp_conn->oic_item);
1186                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1188         }
1189
1190         LASSERT(imp->imp_sec == NULL);
1191         class_decref(imp->imp_obd, "import", imp);
1192         OBD_FREE_PTR(imp);
1193         EXIT;
1194 }
1195
1196 struct obd_import *class_import_get(struct obd_import *import)
1197 {
1198         atomic_inc(&import->imp_refcount);
1199         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1200                atomic_read(&import->imp_refcount),
1201                import->imp_obd->obd_name);
1202         return import;
1203 }
1204 EXPORT_SYMBOL(class_import_get);
1205
1206 void class_import_put(struct obd_import *imp)
1207 {
1208         ENTRY;
1209
1210         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1211
1212         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1213                atomic_read(&imp->imp_refcount) - 1,
1214                imp->imp_obd->obd_name);
1215
1216         if (atomic_dec_and_test(&imp->imp_refcount)) {
1217                 CDEBUG(D_INFO, "final put import %p\n", imp);
1218                 obd_zombie_import_add(imp);
1219         }
1220
1221         EXIT;
1222 }
1223 EXPORT_SYMBOL(class_import_put);
1224
1225 static void init_imp_at(struct imp_at *at) {
1226         int i;
1227         at_init(&at->iat_net_latency, 0, 0);
1228         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1229                 /* max service estimates are tracked on the server side, so
1230                    don't use the AT history here, just use the last reported
1231                    val. (But keep hist for proc histogram, worst_ever) */
1232                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1233                         AT_FLG_NOHIST);
1234         }
1235 }
1236
1237 static void obd_zombie_imp_cull(struct work_struct *ws)
1238 {
1239         struct obd_import *import;
1240
1241         import = container_of(ws, struct obd_import, imp_zombie_work);
1242         obd_zombie_import_free(import);
1243 }
1244
1245 struct obd_import *class_new_import(struct obd_device *obd)
1246 {
1247         struct obd_import *imp;
1248         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1249
1250         OBD_ALLOC(imp, sizeof(*imp));
1251         if (imp == NULL)
1252                 return NULL;
1253
1254         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1255         INIT_LIST_HEAD(&imp->imp_replay_list);
1256         INIT_LIST_HEAD(&imp->imp_sending_list);
1257         INIT_LIST_HEAD(&imp->imp_delayed_list);
1258         INIT_LIST_HEAD(&imp->imp_committed_list);
1259         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1260         imp->imp_known_replied_xid = 0;
1261         imp->imp_replay_cursor = &imp->imp_committed_list;
1262         spin_lock_init(&imp->imp_lock);
1263         imp->imp_last_success_conn = 0;
1264         imp->imp_state = LUSTRE_IMP_NEW;
1265         imp->imp_obd = class_incref(obd, "import", imp);
1266         rwlock_init(&imp->imp_sec_lock);
1267         init_waitqueue_head(&imp->imp_recovery_waitq);
1268         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1269
1270         if (curr_pid_ns && curr_pid_ns->child_reaper)
1271                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1272         else
1273                 imp->imp_sec_refpid = 1;
1274
1275         atomic_set(&imp->imp_refcount, 2);
1276         atomic_set(&imp->imp_unregistering, 0);
1277         atomic_set(&imp->imp_inflight, 0);
1278         atomic_set(&imp->imp_replay_inflight, 0);
1279         atomic_set(&imp->imp_inval_count, 0);
1280         INIT_LIST_HEAD(&imp->imp_conn_list);
1281         init_imp_at(&imp->imp_at);
1282
1283         /* the default magic is V2, will be used in connect RPC, and
1284          * then adjusted according to the flags in request/reply. */
1285         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1286
1287         return imp;
1288 }
1289 EXPORT_SYMBOL(class_new_import);
1290
1291 void class_destroy_import(struct obd_import *import)
1292 {
1293         LASSERT(import != NULL);
1294         LASSERT(import != LP_POISON);
1295
1296         spin_lock(&import->imp_lock);
1297         import->imp_generation++;
1298         spin_unlock(&import->imp_lock);
1299         class_import_put(import);
1300 }
1301 EXPORT_SYMBOL(class_destroy_import);
1302
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1304
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1306 {
1307         spin_lock(&exp->exp_locks_list_guard);
1308
1309         LASSERT(lock->l_exp_refs_nr >= 0);
1310
1311         if (lock->l_exp_refs_target != NULL &&
1312             lock->l_exp_refs_target != exp) {
1313                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314                               exp, lock, lock->l_exp_refs_target);
1315         }
1316         if ((lock->l_exp_refs_nr ++) == 0) {
1317                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318                 lock->l_exp_refs_target = exp;
1319         }
1320         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321                lock, exp, lock->l_exp_refs_nr);
1322         spin_unlock(&exp->exp_locks_list_guard);
1323 }
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1325
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1327 {
1328         spin_lock(&exp->exp_locks_list_guard);
1329         LASSERT(lock->l_exp_refs_nr > 0);
1330         if (lock->l_exp_refs_target != exp) {
1331                 LCONSOLE_WARN("lock %p, "
1332                               "mismatching export pointers: %p, %p\n",
1333                               lock, lock->l_exp_refs_target, exp);
1334         }
1335         if (-- lock->l_exp_refs_nr == 0) {
1336                 list_del_init(&lock->l_exp_refs_link);
1337                 lock->l_exp_refs_target = NULL;
1338         }
1339         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340                lock, exp, lock->l_exp_refs_nr);
1341         spin_unlock(&exp->exp_locks_list_guard);
1342 }
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1344 #endif
1345
1346 /* A connection defines an export context in which preallocation can
1347    be managed. This releases the export pointer reference, and returns
1348    the export handle, so the export refcount is 1 when this function
1349    returns. */
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351                   struct obd_uuid *cluuid)
1352 {
1353         struct obd_export *export;
1354         LASSERT(conn != NULL);
1355         LASSERT(obd != NULL);
1356         LASSERT(cluuid != NULL);
1357         ENTRY;
1358
1359         export = class_new_export(obd, cluuid);
1360         if (IS_ERR(export))
1361                 RETURN(PTR_ERR(export));
1362
1363         conn->cookie = export->exp_handle.h_cookie;
1364         class_export_put(export);
1365
1366         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367                cluuid->uuid, conn->cookie);
1368         RETURN(0);
1369 }
1370 EXPORT_SYMBOL(class_connect);
1371
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1374 {
1375         struct obd_device *obd = exp->exp_obd;
1376
1377         spin_lock(&obd->obd_recovery_task_lock);
1378         if (obd->obd_recovering) {
1379                 if (exp->exp_in_recovery) {
1380                         spin_lock(&exp->exp_lock);
1381                         exp->exp_in_recovery = 0;
1382                         spin_unlock(&exp->exp_lock);
1383                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1384                         atomic_dec(&obd->obd_connected_clients);
1385                 }
1386
1387                 /* if called during recovery then should update
1388                  * obd_stale_clients counter,
1389                  * lightweight exports are not counted */
1390                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391                         exp->exp_obd->obd_stale_clients++;
1392         }
1393         spin_unlock(&obd->obd_recovery_task_lock);
1394
1395         spin_lock(&exp->exp_lock);
1396         /** Cleanup req replay fields */
1397         if (exp->exp_req_replay_needed) {
1398                 exp->exp_req_replay_needed = 0;
1399
1400                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401                 atomic_dec(&obd->obd_req_replay_clients);
1402         }
1403
1404         /** Cleanup lock replay data */
1405         if (exp->exp_lock_replay_needed) {
1406                 exp->exp_lock_replay_needed = 0;
1407
1408                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409                 atomic_dec(&obd->obd_lock_replay_clients);
1410         }
1411         spin_unlock(&exp->exp_lock);
1412 }
1413
1414 /* This function removes 1-3 references from the export:
1415  * 1 - for export pointer passed
1416  * and if disconnect really need
1417  * 2 - removing from hash
1418  * 3 - in client_unlink_export
1419  * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1421 {
1422         int already_disconnected;
1423         ENTRY;
1424
1425         if (export == NULL) {
1426                 CWARN("attempting to free NULL export %p\n", export);
1427                 RETURN(-EINVAL);
1428         }
1429
1430         spin_lock(&export->exp_lock);
1431         already_disconnected = export->exp_disconnected;
1432         export->exp_disconnected = 1;
1433         /*  We hold references of export for uuid hash
1434          *  and nid_hash and export link at least. So
1435          *  it is safe to call cfs_hash_del in there.  */
1436         if (!hlist_unhashed(&export->exp_nid_hash))
1437                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1438                              &export->exp_connection->c_peer.nid,
1439                              &export->exp_nid_hash);
1440         spin_unlock(&export->exp_lock);
1441
1442         /* class_cleanup(), abort_recovery(), and class_fail_export()
1443          * all end up in here, and if any of them race we shouldn't
1444          * call extra class_export_puts(). */
1445         if (already_disconnected) {
1446                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1447                 GOTO(no_disconn, already_disconnected);
1448         }
1449
1450         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451                export->exp_handle.h_cookie);
1452
1453         class_export_recovery_cleanup(export);
1454         class_unlink_export(export);
1455 no_disconn:
1456         class_export_put(export);
1457         RETURN(0);
1458 }
1459 EXPORT_SYMBOL(class_disconnect);
1460
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1463 {
1464         int connected = 0;
1465
1466         if (exp) {
1467                 spin_lock(&exp->exp_lock);
1468                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469                 spin_unlock(&exp->exp_lock);
1470         }
1471         return connected;
1472 }
1473 EXPORT_SYMBOL(class_connected_export);
1474
1475 static void class_disconnect_export_list(struct list_head *list,
1476                                          enum obd_option flags)
1477 {
1478         int rc;
1479         struct obd_export *exp;
1480         ENTRY;
1481
1482         /* It's possible that an export may disconnect itself, but
1483          * nothing else will be added to this list. */
1484         while (!list_empty(list)) {
1485                 exp = list_entry(list->next, struct obd_export,
1486                                  exp_obd_chain);
1487                 /* need for safe call CDEBUG after obd_disconnect */
1488                 class_export_get(exp);
1489
1490                 spin_lock(&exp->exp_lock);
1491                 exp->exp_flags = flags;
1492                 spin_unlock(&exp->exp_lock);
1493
1494                 if (obd_uuid_equals(&exp->exp_client_uuid,
1495                                     &exp->exp_obd->obd_uuid)) {
1496                         CDEBUG(D_HA,
1497                                "exp %p export uuid == obd uuid, don't discon\n",
1498                                exp);
1499                         /* Need to delete this now so we don't end up pointing
1500                          * to work_list later when this export is cleaned up. */
1501                         list_del_init(&exp->exp_obd_chain);
1502                         class_export_put(exp);
1503                         continue;
1504                 }
1505
1506                 class_export_get(exp);
1507                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508                        "last request at %lld\n",
1509                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510                        exp, exp->exp_last_request_time);
1511                 /* release one export reference anyway */
1512                 rc = obd_disconnect(exp);
1513
1514                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515                        obd_export_nid2str(exp), exp, rc);
1516                 class_export_put(exp);
1517         }
1518         EXIT;
1519 }
1520
1521 void class_disconnect_exports(struct obd_device *obd)
1522 {
1523         LIST_HEAD(work_list);
1524         ENTRY;
1525
1526         /* Move all of the exports from obd_exports to a work list, en masse. */
1527         spin_lock(&obd->obd_dev_lock);
1528         list_splice_init(&obd->obd_exports, &work_list);
1529         list_splice_init(&obd->obd_delayed_exports, &work_list);
1530         spin_unlock(&obd->obd_dev_lock);
1531
1532         if (!list_empty(&work_list)) {
1533                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1534                        "disconnecting them\n", obd->obd_minor, obd);
1535                 class_disconnect_export_list(&work_list,
1536                                              exp_flags_from_obd(obd));
1537         } else
1538                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1539                        obd->obd_minor, obd);
1540         EXIT;
1541 }
1542 EXPORT_SYMBOL(class_disconnect_exports);
1543
1544 /* Remove exports that have not completed recovery.
1545  */
1546 void class_disconnect_stale_exports(struct obd_device *obd,
1547                                     int (*test_export)(struct obd_export *))
1548 {
1549         LIST_HEAD(work_list);
1550         struct obd_export *exp, *n;
1551         int evicted = 0;
1552         ENTRY;
1553
1554         spin_lock(&obd->obd_dev_lock);
1555         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1556                                  exp_obd_chain) {
1557                 /* don't count self-export as client */
1558                 if (obd_uuid_equals(&exp->exp_client_uuid,
1559                                     &exp->exp_obd->obd_uuid))
1560                         continue;
1561
1562                 /* don't evict clients which have no slot in last_rcvd
1563                  * (e.g. lightweight connection) */
1564                 if (exp->exp_target_data.ted_lr_idx == -1)
1565                         continue;
1566
1567                 spin_lock(&exp->exp_lock);
1568                 if (exp->exp_failed || test_export(exp)) {
1569                         spin_unlock(&exp->exp_lock);
1570                         continue;
1571                 }
1572                 exp->exp_failed = 1;
1573                 spin_unlock(&exp->exp_lock);
1574
1575                 list_move(&exp->exp_obd_chain, &work_list);
1576                 evicted++;
1577                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1578                        obd->obd_name, exp->exp_client_uuid.uuid,
1579                        obd_export_nid2str(exp));
1580                 print_export_data(exp, "EVICTING", 0, D_HA);
1581         }
1582         spin_unlock(&obd->obd_dev_lock);
1583
1584         if (evicted)
1585                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1586                               obd->obd_name, evicted);
1587
1588         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1589                                                  OBD_OPT_ABORT_RECOV);
1590         EXIT;
1591 }
1592 EXPORT_SYMBOL(class_disconnect_stale_exports);
1593
1594 void class_fail_export(struct obd_export *exp)
1595 {
1596         int rc, already_failed;
1597
1598         spin_lock(&exp->exp_lock);
1599         already_failed = exp->exp_failed;
1600         exp->exp_failed = 1;
1601         spin_unlock(&exp->exp_lock);
1602
1603         if (already_failed) {
1604                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1605                        exp, exp->exp_client_uuid.uuid);
1606                 return;
1607         }
1608
1609         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1610                exp, exp->exp_client_uuid.uuid);
1611
1612         if (obd_dump_on_timeout)
1613                 libcfs_debug_dumplog();
1614
1615         /* need for safe call CDEBUG after obd_disconnect */
1616         class_export_get(exp);
1617
1618         /* Most callers into obd_disconnect are removing their own reference
1619          * (request, for example) in addition to the one from the hash table.
1620          * We don't have such a reference here, so make one. */
1621         class_export_get(exp);
1622         rc = obd_disconnect(exp);
1623         if (rc)
1624                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1625         else
1626                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1627                        exp, exp->exp_client_uuid.uuid);
1628         class_export_put(exp);
1629 }
1630 EXPORT_SYMBOL(class_fail_export);
1631
1632 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1633 {
1634         struct cfs_hash *nid_hash;
1635         struct obd_export *doomed_exp = NULL;
1636         int exports_evicted = 0;
1637
1638         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1639
1640         spin_lock(&obd->obd_dev_lock);
1641         /* umount has run already, so evict thread should leave
1642          * its task to umount thread now */
1643         if (obd->obd_stopping) {
1644                 spin_unlock(&obd->obd_dev_lock);
1645                 return exports_evicted;
1646         }
1647         nid_hash = obd->obd_nid_hash;
1648         cfs_hash_getref(nid_hash);
1649         spin_unlock(&obd->obd_dev_lock);
1650
1651         do {
1652                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1653                 if (doomed_exp == NULL)
1654                         break;
1655
1656                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1657                          "nid %s found, wanted nid %s, requested nid %s\n",
1658                          obd_export_nid2str(doomed_exp),
1659                          libcfs_nid2str(nid_key), nid);
1660                 LASSERTF(doomed_exp != obd->obd_self_export,
1661                          "self-export is hashed by NID?\n");
1662                 exports_evicted++;
1663                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1664                               "request\n", obd->obd_name,
1665                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1666                               obd_export_nid2str(doomed_exp));
1667                 class_fail_export(doomed_exp);
1668                 class_export_put(doomed_exp);
1669         } while (1);
1670
1671         cfs_hash_putref(nid_hash);
1672
1673         if (!exports_evicted)
1674                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1675                        obd->obd_name, nid);
1676         return exports_evicted;
1677 }
1678 EXPORT_SYMBOL(obd_export_evict_by_nid);
1679
1680 #ifdef HAVE_SERVER_SUPPORT
1681 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1682 {
1683         struct obd_export *doomed_exp = NULL;
1684         struct obd_uuid doomed_uuid;
1685         int exports_evicted = 0;
1686
1687         spin_lock(&obd->obd_dev_lock);
1688         if (obd->obd_stopping) {
1689                 spin_unlock(&obd->obd_dev_lock);
1690                 return exports_evicted;
1691         }
1692         spin_unlock(&obd->obd_dev_lock);
1693
1694         obd_str2uuid(&doomed_uuid, uuid);
1695         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1696                 CERROR("%s: can't evict myself\n", obd->obd_name);
1697                 return exports_evicted;
1698         }
1699
1700         doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1701         if (doomed_exp == NULL) {
1702                 CERROR("%s: can't disconnect %s: no exports found\n",
1703                        obd->obd_name, uuid);
1704         } else {
1705                 CWARN("%s: evicting %s at adminstrative request\n",
1706                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1707                 class_fail_export(doomed_exp);
1708                 class_export_put(doomed_exp);
1709                 obd_uuid_del(obd, doomed_exp);
1710                 exports_evicted++;
1711         }
1712
1713         return exports_evicted;
1714 }
1715 #endif /* HAVE_SERVER_SUPPORT */
1716
1717 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1718 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1719 EXPORT_SYMBOL(class_export_dump_hook);
1720 #endif
1721
1722 static void print_export_data(struct obd_export *exp, const char *status,
1723                               int locks, int debug_level)
1724 {
1725         struct ptlrpc_reply_state *rs;
1726         struct ptlrpc_reply_state *first_reply = NULL;
1727         int nreplies = 0;
1728
1729         spin_lock(&exp->exp_lock);
1730         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1731                             rs_exp_list) {
1732                 if (nreplies == 0)
1733                         first_reply = rs;
1734                 nreplies++;
1735         }
1736         spin_unlock(&exp->exp_lock);
1737
1738         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1739                "%p %s %llu stale:%d\n",
1740                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1741                obd_export_nid2str(exp),
1742                refcount_read(&exp->exp_handle.h_ref),
1743                atomic_read(&exp->exp_rpc_count),
1744                atomic_read(&exp->exp_cb_count),
1745                atomic_read(&exp->exp_locks_count),
1746                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1747                nreplies, first_reply, nreplies > 3 ? "..." : "",
1748                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1749 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1750         if (locks && class_export_dump_hook != NULL)
1751                 class_export_dump_hook(exp);
1752 #endif
1753 }
1754
1755 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1756 {
1757         struct obd_export *exp;
1758
1759         spin_lock(&obd->obd_dev_lock);
1760         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1761                 print_export_data(exp, "ACTIVE", locks, debug_level);
1762         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1763                 print_export_data(exp, "UNLINKED", locks, debug_level);
1764         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1765                 print_export_data(exp, "DELAYED", locks, debug_level);
1766         spin_unlock(&obd->obd_dev_lock);
1767 }
1768
1769 void obd_exports_barrier(struct obd_device *obd)
1770 {
1771         int waited = 2;
1772         LASSERT(list_empty(&obd->obd_exports));
1773         spin_lock(&obd->obd_dev_lock);
1774         while (!list_empty(&obd->obd_unlinked_exports)) {
1775                 spin_unlock(&obd->obd_dev_lock);
1776                 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1777                 if (waited > 5 && is_power_of_2(waited)) {
1778                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1779                                       "more than %d seconds. "
1780                                       "The obd refcount = %d. Is it stuck?\n",
1781                                       obd->obd_name, waited,
1782                                       atomic_read(&obd->obd_refcount));
1783                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1784                 }
1785                 waited *= 2;
1786                 spin_lock(&obd->obd_dev_lock);
1787         }
1788         spin_unlock(&obd->obd_dev_lock);
1789 }
1790 EXPORT_SYMBOL(obd_exports_barrier);
1791
1792 /**
1793  * Add export to the obd_zombe thread and notify it.
1794  */
1795 static void obd_zombie_export_add(struct obd_export *exp) {
1796         atomic_dec(&obd_stale_export_num);
1797         spin_lock(&exp->exp_obd->obd_dev_lock);
1798         LASSERT(!list_empty(&exp->exp_obd_chain));
1799         list_del_init(&exp->exp_obd_chain);
1800         spin_unlock(&exp->exp_obd->obd_dev_lock);
1801
1802         queue_work(zombie_wq, &exp->exp_zombie_work);
1803 }
1804
1805 /**
1806  * Add import to the obd_zombe thread and notify it.
1807  */
1808 static void obd_zombie_import_add(struct obd_import *imp) {
1809         LASSERT(imp->imp_sec == NULL);
1810
1811         queue_work(zombie_wq, &imp->imp_zombie_work);
1812 }
1813
1814 /**
1815  * wait when obd_zombie import/export queues become empty
1816  */
1817 void obd_zombie_barrier(void)
1818 {
1819         flush_workqueue(zombie_wq);
1820 }
1821 EXPORT_SYMBOL(obd_zombie_barrier);
1822
1823
1824 struct obd_export *obd_stale_export_get(void)
1825 {
1826         struct obd_export *exp = NULL;
1827         ENTRY;
1828
1829         spin_lock(&obd_stale_export_lock);
1830         if (!list_empty(&obd_stale_exports)) {
1831                 exp = list_entry(obd_stale_exports.next,
1832                                  struct obd_export, exp_stale_list);
1833                 list_del_init(&exp->exp_stale_list);
1834         }
1835         spin_unlock(&obd_stale_export_lock);
1836
1837         if (exp) {
1838                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1839                        atomic_read(&obd_stale_export_num));
1840         }
1841         RETURN(exp);
1842 }
1843 EXPORT_SYMBOL(obd_stale_export_get);
1844
1845 void obd_stale_export_put(struct obd_export *exp)
1846 {
1847         ENTRY;
1848
1849         LASSERT(list_empty(&exp->exp_stale_list));
1850         if (exp->exp_lock_hash &&
1851             atomic_read(&exp->exp_lock_hash->hs_count)) {
1852                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1853                        atomic_read(&obd_stale_export_num));
1854
1855                 spin_lock_bh(&exp->exp_bl_list_lock);
1856                 spin_lock(&obd_stale_export_lock);
1857                 /* Add to the tail if there is no blocked locks,
1858                  * to the head otherwise. */
1859                 if (list_empty(&exp->exp_bl_list))
1860                         list_add_tail(&exp->exp_stale_list,
1861                                       &obd_stale_exports);
1862                 else
1863                         list_add(&exp->exp_stale_list,
1864                                  &obd_stale_exports);
1865
1866                 spin_unlock(&obd_stale_export_lock);
1867                 spin_unlock_bh(&exp->exp_bl_list_lock);
1868         } else {
1869                 class_export_put(exp);
1870         }
1871         EXIT;
1872 }
1873 EXPORT_SYMBOL(obd_stale_export_put);
1874
1875 /**
1876  * Adjust the position of the export in the stale list,
1877  * i.e. move to the head of the list if is needed.
1878  **/
1879 void obd_stale_export_adjust(struct obd_export *exp)
1880 {
1881         LASSERT(exp != NULL);
1882         spin_lock_bh(&exp->exp_bl_list_lock);
1883         spin_lock(&obd_stale_export_lock);
1884
1885         if (!list_empty(&exp->exp_stale_list) &&
1886             !list_empty(&exp->exp_bl_list))
1887                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1888
1889         spin_unlock(&obd_stale_export_lock);
1890         spin_unlock_bh(&exp->exp_bl_list_lock);
1891 }
1892 EXPORT_SYMBOL(obd_stale_export_adjust);
1893
1894 /**
1895  * start destroy zombie import/export thread
1896  */
1897 int obd_zombie_impexp_init(void)
1898 {
1899         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1900         if (!zombie_wq)
1901                 return -ENOMEM;
1902
1903         return 0;
1904 }
1905
1906 /**
1907  * stop destroy zombie import/export thread
1908  */
1909 void obd_zombie_impexp_stop(void)
1910 {
1911         destroy_workqueue(zombie_wq);
1912         LASSERT(list_empty(&obd_stale_exports));
1913 }
1914
1915 /***** Kernel-userspace comm helpers *******/
1916
1917 /* Get length of entire message, including header */
1918 int kuc_len(int payload_len)
1919 {
1920         return sizeof(struct kuc_hdr) + payload_len;
1921 }
1922 EXPORT_SYMBOL(kuc_len);
1923
1924 /* Get a pointer to kuc header, given a ptr to the payload
1925  * @param p Pointer to payload area
1926  * @returns Pointer to kuc header
1927  */
1928 struct kuc_hdr * kuc_ptr(void *p)
1929 {
1930         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1931         LASSERT(lh->kuc_magic == KUC_MAGIC);
1932         return lh;
1933 }
1934 EXPORT_SYMBOL(kuc_ptr);
1935
1936 /* Alloc space for a message, and fill in header
1937  * @return Pointer to payload area
1938  */
1939 void *kuc_alloc(int payload_len, int transport, int type)
1940 {
1941         struct kuc_hdr *lh;
1942         int len = kuc_len(payload_len);
1943
1944         OBD_ALLOC(lh, len);
1945         if (lh == NULL)
1946                 return ERR_PTR(-ENOMEM);
1947
1948         lh->kuc_magic = KUC_MAGIC;
1949         lh->kuc_transport = transport;
1950         lh->kuc_msgtype = type;
1951         lh->kuc_msglen = len;
1952
1953         return (void *)(lh + 1);
1954 }
1955 EXPORT_SYMBOL(kuc_alloc);
1956
1957 /* Takes pointer to payload area */
1958 void kuc_free(void *p, int payload_len)
1959 {
1960         struct kuc_hdr *lh = kuc_ptr(p);
1961         OBD_FREE(lh, kuc_len(payload_len));
1962 }
1963 EXPORT_SYMBOL(kuc_free);
1964
1965 struct obd_request_slot_waiter {
1966         struct list_head        orsw_entry;
1967         wait_queue_head_t       orsw_waitq;
1968         bool                    orsw_signaled;
1969 };
1970
1971 static bool obd_request_slot_avail(struct client_obd *cli,
1972                                    struct obd_request_slot_waiter *orsw)
1973 {
1974         bool avail;
1975
1976         spin_lock(&cli->cl_loi_list_lock);
1977         avail = !!list_empty(&orsw->orsw_entry);
1978         spin_unlock(&cli->cl_loi_list_lock);
1979
1980         return avail;
1981 };
1982
1983 /*
1984  * For network flow control, the RPC sponsor needs to acquire a credit
1985  * before sending the RPC. The credits count for a connection is defined
1986  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1987  * the subsequent RPC sponsors need to wait until others released their
1988  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1989  */
1990 int obd_get_request_slot(struct client_obd *cli)
1991 {
1992         struct obd_request_slot_waiter   orsw;
1993         int                              rc;
1994
1995         spin_lock(&cli->cl_loi_list_lock);
1996         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1997                 cli->cl_rpcs_in_flight++;
1998                 spin_unlock(&cli->cl_loi_list_lock);
1999                 return 0;
2000         }
2001
2002         init_waitqueue_head(&orsw.orsw_waitq);
2003         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2004         orsw.orsw_signaled = false;
2005         spin_unlock(&cli->cl_loi_list_lock);
2006
2007         rc = l_wait_event_abortable(orsw.orsw_waitq,
2008                                     obd_request_slot_avail(cli, &orsw) ||
2009                                     orsw.orsw_signaled);
2010
2011         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2012          * freed but other (such as obd_put_request_slot) is using it. */
2013         spin_lock(&cli->cl_loi_list_lock);
2014         if (rc != 0) {
2015                 if (!orsw.orsw_signaled) {
2016                         if (list_empty(&orsw.orsw_entry))
2017                                 cli->cl_rpcs_in_flight--;
2018                         else
2019                                 list_del(&orsw.orsw_entry);
2020                 }
2021                 rc = -EINTR;
2022         }
2023
2024         if (orsw.orsw_signaled) {
2025                 LASSERT(list_empty(&orsw.orsw_entry));
2026
2027                 rc = -EINTR;
2028         }
2029         spin_unlock(&cli->cl_loi_list_lock);
2030
2031         return rc;
2032 }
2033 EXPORT_SYMBOL(obd_get_request_slot);
2034
2035 void obd_put_request_slot(struct client_obd *cli)
2036 {
2037         struct obd_request_slot_waiter *orsw;
2038
2039         spin_lock(&cli->cl_loi_list_lock);
2040         cli->cl_rpcs_in_flight--;
2041
2042         /* If there is free slot, wakeup the first waiter. */
2043         if (!list_empty(&cli->cl_flight_waiters) &&
2044             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2045                 orsw = list_entry(cli->cl_flight_waiters.next,
2046                                   struct obd_request_slot_waiter, orsw_entry);
2047                 list_del_init(&orsw->orsw_entry);
2048                 cli->cl_rpcs_in_flight++;
2049                 wake_up(&orsw->orsw_waitq);
2050         }
2051         spin_unlock(&cli->cl_loi_list_lock);
2052 }
2053 EXPORT_SYMBOL(obd_put_request_slot);
2054
2055 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2056 {
2057         return cli->cl_max_rpcs_in_flight;
2058 }
2059 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2060
2061 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2062 {
2063         struct obd_request_slot_waiter *orsw;
2064         __u32                           old;
2065         int                             diff;
2066         int                             i;
2067         const char *type_name;
2068         int                             rc;
2069
2070         if (max > OBD_MAX_RIF_MAX || max < 1)
2071                 return -ERANGE;
2072
2073         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2074         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2075                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2076                  * strictly lower that max_rpcs_in_flight */
2077                 if (max < 2) {
2078                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2079                                "because it must be higher than "
2080                                "max_mod_rpcs_in_flight value",
2081                                cli->cl_import->imp_obd->obd_name);
2082                         return -ERANGE;
2083                 }
2084                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2085                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2086                         if (rc != 0)
2087                                 return rc;
2088                 }
2089         }
2090
2091         spin_lock(&cli->cl_loi_list_lock);
2092         old = cli->cl_max_rpcs_in_flight;
2093         cli->cl_max_rpcs_in_flight = max;
2094         client_adjust_max_dirty(cli);
2095
2096         diff = max - old;
2097
2098         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2099         for (i = 0; i < diff; i++) {
2100                 if (list_empty(&cli->cl_flight_waiters))
2101                         break;
2102
2103                 orsw = list_entry(cli->cl_flight_waiters.next,
2104                                   struct obd_request_slot_waiter, orsw_entry);
2105                 list_del_init(&orsw->orsw_entry);
2106                 cli->cl_rpcs_in_flight++;
2107                 wake_up(&orsw->orsw_waitq);
2108         }
2109         spin_unlock(&cli->cl_loi_list_lock);
2110
2111         return 0;
2112 }
2113 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2114
2115 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2116 {
2117         return cli->cl_max_mod_rpcs_in_flight;
2118 }
2119 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2120
2121 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2122 {
2123         struct obd_connect_data *ocd;
2124         __u16 maxmodrpcs;
2125         __u16 prev;
2126
2127         if (max > OBD_MAX_RIF_MAX || max < 1)
2128                 return -ERANGE;
2129
2130         /* cannot exceed or equal max_rpcs_in_flight */
2131         if (max >= cli->cl_max_rpcs_in_flight) {
2132                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2133                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2134                        cli->cl_import->imp_obd->obd_name,
2135                        max, cli->cl_max_rpcs_in_flight);
2136                 return -ERANGE;
2137         }
2138
2139         /* cannot exceed max modify RPCs in flight supported by the server */
2140         ocd = &cli->cl_import->imp_connect_data;
2141         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2142                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2143         else
2144                 maxmodrpcs = 1;
2145         if (max > maxmodrpcs) {
2146                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2147                        "higher than max_mod_rpcs_per_client value (%hu) "
2148                        "returned by the server at connection\n",
2149                        cli->cl_import->imp_obd->obd_name,
2150                        max, maxmodrpcs);
2151                 return -ERANGE;
2152         }
2153
2154         spin_lock(&cli->cl_mod_rpcs_lock);
2155
2156         prev = cli->cl_max_mod_rpcs_in_flight;
2157         cli->cl_max_mod_rpcs_in_flight = max;
2158
2159         /* wakeup waiters if limit has been increased */
2160         if (cli->cl_max_mod_rpcs_in_flight > prev)
2161                 wake_up(&cli->cl_mod_rpcs_waitq);
2162
2163         spin_unlock(&cli->cl_mod_rpcs_lock);
2164
2165         return 0;
2166 }
2167 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2168
2169 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2170                                struct seq_file *seq)
2171 {
2172         unsigned long mod_tot = 0, mod_cum;
2173         struct timespec64 now;
2174         int i;
2175
2176         ktime_get_real_ts64(&now);
2177
2178         spin_lock(&cli->cl_mod_rpcs_lock);
2179
2180         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2181                    (s64)now.tv_sec, now.tv_nsec);
2182         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2183                    cli->cl_mod_rpcs_in_flight);
2184
2185         seq_printf(seq, "\n\t\t\tmodify\n");
2186         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2187
2188         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2189
2190         mod_cum = 0;
2191         for (i = 0; i < OBD_HIST_MAX; i++) {
2192                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2193                 mod_cum += mod;
2194                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2195                            i, mod, pct(mod, mod_tot),
2196                            pct(mod_cum, mod_tot));
2197                 if (mod_cum == mod_tot)
2198                         break;
2199         }
2200
2201         spin_unlock(&cli->cl_mod_rpcs_lock);
2202
2203         return 0;
2204 }
2205 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2206
2207 /* The number of modify RPCs sent in parallel is limited
2208  * because the server has a finite number of slots per client to
2209  * store request result and ensure reply reconstruction when needed.
2210  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2211  * that takes into account server limit and cl_max_rpcs_in_flight
2212  * value.
2213  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2214  * one close request is allowed above the maximum.
2215  */
2216 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2217                                                  bool close_req)
2218 {
2219         bool avail;
2220
2221         /* A slot is available if
2222          * - number of modify RPCs in flight is less than the max
2223          * - it's a close RPC and no other close request is in flight
2224          */
2225         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2226                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2227
2228         return avail;
2229 }
2230
2231 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2232                                          bool close_req)
2233 {
2234         bool avail;
2235
2236         spin_lock(&cli->cl_mod_rpcs_lock);
2237         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2238         spin_unlock(&cli->cl_mod_rpcs_lock);
2239         return avail;
2240 }
2241
2242
2243 /* Get a modify RPC slot from the obd client @cli according
2244  * to the kind of operation @opc that is going to be sent
2245  * and the intent @it of the operation if it applies.
2246  * If the maximum number of modify RPCs in flight is reached
2247  * the thread is put to sleep.
2248  * Returns the tag to be set in the request message. Tag 0
2249  * is reserved for non-modifying requests.
2250  */
2251 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2252 {
2253         bool                    close_req = false;
2254         __u16                   i, max;
2255
2256         if (opc == MDS_CLOSE)
2257                 close_req = true;
2258
2259         do {
2260                 spin_lock(&cli->cl_mod_rpcs_lock);
2261                 max = cli->cl_max_mod_rpcs_in_flight;
2262                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2263                         /* there is a slot available */
2264                         cli->cl_mod_rpcs_in_flight++;
2265                         if (close_req)
2266                                 cli->cl_close_rpcs_in_flight++;
2267                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2268                                          cli->cl_mod_rpcs_in_flight);
2269                         /* find a free tag */
2270                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2271                                                 max + 1);
2272                         LASSERT(i < OBD_MAX_RIF_MAX);
2273                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2274                         spin_unlock(&cli->cl_mod_rpcs_lock);
2275                         /* tag 0 is reserved for non-modify RPCs */
2276
2277                         CDEBUG(D_RPCTRACE,
2278                                "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2279                                cli->cl_import->imp_obd->obd_name,
2280                                i + 1, opc, max);
2281
2282                         return i + 1;
2283                 }
2284                 spin_unlock(&cli->cl_mod_rpcs_lock);
2285
2286                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2287                        "opc %u, max %hu\n",
2288                        cli->cl_import->imp_obd->obd_name, opc, max);
2289
2290                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2291                                           obd_mod_rpc_slot_avail(cli,
2292                                                                  close_req));
2293         } while (true);
2294 }
2295 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2296
2297 /* Put a modify RPC slot from the obd client @cli according
2298  * to the kind of operation @opc that has been sent.
2299  */
2300 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2301 {
2302         bool                    close_req = false;
2303
2304         if (tag == 0)
2305                 return;
2306
2307         if (opc == MDS_CLOSE)
2308                 close_req = true;
2309
2310         spin_lock(&cli->cl_mod_rpcs_lock);
2311         cli->cl_mod_rpcs_in_flight--;
2312         if (close_req)
2313                 cli->cl_close_rpcs_in_flight--;
2314         /* release the tag in the bitmap */
2315         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2316         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2317         spin_unlock(&cli->cl_mod_rpcs_lock);
2318         wake_up(&cli->cl_mod_rpcs_waitq);
2319 }
2320 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2321