Whamcloud - gitweb
LU-4456 osp: extra check for opd_pre
[fs/lustre-release.git] / lustre / osp / osp_precreate.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osp/osp_sync.c
37  *
38  * Lustre OST Proxy Device
39  *
40  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
41  * Author: Mikhail Pershin <mike.pershin@intel.com>
42  * Author: Di Wang <di.wang@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MDS
46
47 #include "osp_internal.h"
48
49 /*
50  * there are two specific states to take care about:
51  *
52  * = import is disconnected =
53  *
54  * = import is inactive =
55  *   in this case osp_declare_object_create() returns an error
56  *
57  */
58
59 /*
60  * statfs
61  */
62 static inline int osp_statfs_need_update(struct osp_device *d)
63 {
64         return !cfs_time_before(cfs_time_current(),
65                                 d->opd_statfs_fresh_till);
66 }
67
68 /*
69  * OSP tries to maintain pool of available objects so that calls to create
70  * objects don't block most of time
71  *
72  * each time OSP gets connected to OST, we should start from precreation cleanup
73  */
74 static inline bool osp_precreate_running(struct osp_device *d)
75 {
76         return !!(d->opd_pre_thread.t_flags & SVC_RUNNING);
77 }
78
79 static inline bool osp_precreate_stopped(struct osp_device *d)
80 {
81         return !!(d->opd_pre_thread.t_flags & SVC_STOPPED);
82 }
83
84 static void osp_statfs_timer_cb(unsigned long _d)
85 {
86         struct osp_device *d = (struct osp_device *) _d;
87
88         LASSERT(d);
89         if (d->opd_pre != NULL && osp_precreate_running(d))
90                 wake_up(&d->opd_pre_waitq);
91 }
92
93 static int osp_statfs_interpret(const struct lu_env *env,
94                                 struct ptlrpc_request *req,
95                                 union ptlrpc_async_args *aa, int rc)
96 {
97         struct obd_import       *imp = req->rq_import;
98         struct obd_statfs       *msfs;
99         struct osp_device       *d;
100
101         ENTRY;
102
103         aa = ptlrpc_req_async_args(req);
104         d = aa->pointer_arg[0];
105         LASSERT(d);
106
107         if (rc != 0)
108                 GOTO(out, rc);
109
110         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
111         if (msfs == NULL)
112                 GOTO(out, rc = -EPROTO);
113
114         d->opd_statfs = *msfs;
115
116         osp_pre_update_status(d, rc);
117
118         /* schedule next update */
119         d->opd_statfs_fresh_till = cfs_time_shift(d->opd_statfs_maxage);
120         cfs_timer_arm(&d->opd_statfs_timer, d->opd_statfs_fresh_till);
121         d->opd_statfs_update_in_progress = 0;
122
123         CDEBUG(D_CACHE, "updated statfs %p\n", d);
124
125         RETURN(0);
126 out:
127         /* couldn't update statfs, try again as soon as possible */
128         if (d->opd_pre != NULL && osp_precreate_running(d))
129                 wake_up(&d->opd_pre_waitq);
130
131         if (req->rq_import_generation == imp->imp_generation)
132                 CDEBUG(D_CACHE, "%s: couldn't update statfs: rc = %d\n",
133                        d->opd_obd->obd_name, rc);
134         RETURN(rc);
135 }
136
137 static int osp_statfs_update(struct osp_device *d)
138 {
139         struct ptlrpc_request   *req;
140         struct obd_import       *imp;
141         union ptlrpc_async_args *aa;
142         int                      rc;
143
144         ENTRY;
145
146         CDEBUG(D_CACHE, "going to update statfs\n");
147
148         imp = d->opd_obd->u.cli.cl_import;
149         LASSERT(imp);
150
151         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
152         if (req == NULL)
153                 RETURN(-ENOMEM);
154
155         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
156         if (rc) {
157                 ptlrpc_request_free(req);
158                 RETURN(rc);
159         }
160         ptlrpc_request_set_replen(req);
161         req->rq_request_portal = OST_CREATE_PORTAL;
162         ptlrpc_at_set_req_timeout(req);
163
164         req->rq_interpret_reply = (ptlrpc_interpterer_t)osp_statfs_interpret;
165         aa = ptlrpc_req_async_args(req);
166         aa->pointer_arg[0] = d;
167
168         /*
169          * no updates till reply
170          */
171         cfs_timer_disarm(&d->opd_statfs_timer);
172         d->opd_statfs_fresh_till = cfs_time_shift(obd_timeout * 1000);
173         d->opd_statfs_update_in_progress = 1;
174
175         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
176
177         RETURN(0);
178 }
179
180 /*
181  * XXX: there might be a case where removed object(s) do not add free
182  * space (empty object). if the number of such deletions is high, then
183  * we can start to update statfs too often - a rpc storm
184  * TODO: some throttling is needed
185  */
186 void osp_statfs_need_now(struct osp_device *d)
187 {
188         if (!d->opd_statfs_update_in_progress) {
189                 /*
190                  * if current status is -ENOSPC (lack of free space on OST)
191                  * then we should poll OST immediately once object destroy
192                  * is replied
193                  */
194                 d->opd_statfs_fresh_till = cfs_time_shift(-1);
195                 cfs_timer_disarm(&d->opd_statfs_timer);
196                 wake_up(&d->opd_pre_waitq);
197         }
198 }
199
200 static inline int osp_objs_precreated(const struct lu_env *env,
201                                       struct osp_device *osp)
202 {
203         struct lu_fid *fid1 = &osp->opd_pre_last_created_fid;
204         struct lu_fid *fid2 = &osp->opd_pre_used_fid;
205
206         LASSERTF(fid_seq(fid1) == fid_seq(fid2),
207                  "Created fid"DFID" Next fid "DFID"\n", PFID(fid1), PFID(fid2));
208
209         if (fid_is_idif(fid1)) {
210                 struct ost_id *oi1 = &osp_env_info(env)->osi_oi;
211                 struct ost_id *oi2 = &osp_env_info(env)->osi_oi2;
212
213                 LASSERT(fid_is_idif(fid1) && fid_is_idif(fid2));
214                 fid_to_ostid(fid1, oi1);
215                 fid_to_ostid(fid2, oi2);
216                 LASSERT(ostid_id(oi1) >= ostid_id(oi2));
217
218                 return ostid_id(oi1) - ostid_id(oi2);
219         }
220
221         return fid_oid(fid1) - fid_oid(fid2);
222 }
223
224 static inline int osp_precreate_near_empty_nolock(const struct lu_env *env,
225                                                   struct osp_device *d)
226 {
227         int window = osp_objs_precreated(env, d);
228
229         /* don't consider new precreation till OST is healty and
230          * has free space */
231         return ((window - d->opd_pre_reserved < d->opd_pre_grow_count / 2) &&
232                 (d->opd_pre_status == 0));
233 }
234
235 static inline int osp_precreate_near_empty(const struct lu_env *env,
236                                            struct osp_device *d)
237 {
238         int rc;
239
240         /* XXX: do we really need locking here? */
241         spin_lock(&d->opd_pre_lock);
242         rc = osp_precreate_near_empty_nolock(env, d);
243         spin_unlock(&d->opd_pre_lock);
244         return rc;
245 }
246
247 static inline int osp_create_end_seq(const struct lu_env *env,
248                                      struct osp_device *osp)
249 {
250         struct lu_fid *fid = &osp->opd_pre_used_fid;
251         int rc;
252
253         spin_lock(&osp->opd_pre_lock);
254         rc = osp_fid_end_seq(env, fid);
255         spin_unlock(&osp->opd_pre_lock);
256         return rc;
257 }
258
259 /**
260  * Write fid into last_oid/last_seq file.
261  **/
262 int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
263                                  struct lu_fid *fid, int sync)
264 {
265         struct osp_thread_info  *oti = osp_env_info(env);
266         struct lu_buf      *lb_oid = &oti->osi_lb;
267         struct lu_buf      *lb_oseq = &oti->osi_lb2;
268         loff_t             oid_off;
269         loff_t             oseq_off;
270         struct thandle    *th;
271         int                   rc;
272         ENTRY;
273
274         /* Note: through f_oid is only 32bits, it will also write
275          * 64 bits for oid to keep compatiblity with the previous
276          * version. */
277         lb_oid->lb_buf = &fid->f_oid;
278         lb_oid->lb_len = sizeof(obd_id);
279         oid_off = sizeof(obd_id) * osp->opd_index;
280
281         lb_oseq->lb_buf = &fid->f_seq;
282         lb_oseq->lb_len = sizeof(obd_id);
283         oseq_off = sizeof(obd_id) * osp->opd_index;
284
285         th = dt_trans_create(env, osp->opd_storage);
286         if (IS_ERR(th))
287                 RETURN(PTR_ERR(th));
288
289         th->th_sync |= sync;
290         rc = dt_declare_record_write(env, osp->opd_last_used_oid_file,
291                                      lb_oid->lb_len, oid_off, th);
292         if (rc != 0)
293                 GOTO(out, rc);
294
295         rc = dt_declare_record_write(env, osp->opd_last_used_seq_file,
296                                      lb_oseq->lb_len, oseq_off, th);
297         if (rc != 0)
298                 GOTO(out, rc);
299
300         rc = dt_trans_start_local(env, osp->opd_storage, th);
301         if (rc != 0)
302                 GOTO(out, rc);
303
304         rc = dt_record_write(env, osp->opd_last_used_oid_file, lb_oid,
305                              &oid_off, th);
306         if (rc != 0) {
307                 CERROR("%s: can not write to last seq file: rc = %d\n",
308                         osp->opd_obd->obd_name, rc);
309                 GOTO(out, rc);
310         }
311         rc = dt_record_write(env, osp->opd_last_used_seq_file, lb_oseq,
312                              &oseq_off, th);
313         if (rc) {
314                 CERROR("%s: can not write to last seq file: rc = %d\n",
315                         osp->opd_obd->obd_name, rc);
316                 GOTO(out, rc);
317         }
318 out:
319         dt_trans_stop(env, osp->opd_storage, th);
320         RETURN(rc);
321 }
322
323 int osp_precreate_rollover_new_seq(struct lu_env *env, struct osp_device *osp)
324 {
325         struct lu_fid   *fid = &osp_env_info(env)->osi_fid;
326         struct lu_fid   *last_fid = &osp->opd_last_used_fid;
327         int             rc;
328         ENTRY;
329
330         rc = seq_client_get_seq(env, osp->opd_obd->u.cli.cl_seq, &fid->f_seq);
331         if (rc != 0) {
332                 CERROR("%s: alloc fid error: rc = %d\n",
333                        osp->opd_obd->obd_name, rc);
334                 RETURN(rc);
335         }
336
337         fid->f_oid = 1;
338         fid->f_ver = 0;
339         LASSERTF(fid_seq(fid) != fid_seq(last_fid),
340                  "fid "DFID", last_fid "DFID"\n", PFID(fid),
341                  PFID(last_fid));
342
343         rc = osp_write_last_oid_seq_files(env, osp, fid, 1);
344         if (rc != 0) {
345                 CERROR("%s: Can not update oid/seq file: rc = %d\n",
346                        osp->opd_obd->obd_name, rc);
347                 RETURN(rc);
348         }
349
350         LCONSOLE_INFO("%s: update sequence from "LPX64" to "LPX64"\n",
351                       osp->opd_obd->obd_name, fid_seq(last_fid),
352                       fid_seq(fid));
353         /* Update last_xxx to the new seq */
354         spin_lock(&osp->opd_pre_lock);
355         osp->opd_last_used_fid = *fid;
356         osp->opd_gap_start_fid = *fid;
357         osp->opd_pre_used_fid = *fid;
358         osp->opd_pre_last_created_fid = *fid;
359         spin_unlock(&osp->opd_pre_lock);
360
361         RETURN(rc);
362 }
363
364 /**
365  * alloc fids for precreation.
366  * rc = 0 Success, @grow is the count of real allocation.
367  * rc = 1 Current seq is used up.
368  * rc < 0 Other error.
369  **/
370 static int osp_precreate_fids(const struct lu_env *env, struct osp_device *osp,
371                               struct lu_fid *fid, int *grow)
372 {
373         struct osp_thread_info  *osi = osp_env_info(env);
374         __u64                   end;
375         int                     i = 0;
376
377         if (fid_is_idif(fid)) {
378                 struct lu_fid   *last_fid;
379                 struct ost_id   *oi = &osi->osi_oi;
380
381                 spin_lock(&osp->opd_pre_lock);
382                 last_fid = &osp->opd_pre_last_created_fid;
383                 fid_to_ostid(last_fid, oi);
384                 end = min(ostid_id(oi) + *grow, IDIF_MAX_OID);
385                 *grow = end - ostid_id(oi);
386                 ostid_set_id(oi, ostid_id(oi) + *grow);
387                 spin_unlock(&osp->opd_pre_lock);
388
389                 if (*grow == 0)
390                         return 1;
391
392                 ostid_to_fid(fid, oi, osp->opd_index);
393                 return 0;
394         }
395
396         spin_lock(&osp->opd_pre_lock);
397         *fid = osp->opd_pre_last_created_fid;
398         end = fid->f_oid;
399         end = min((end + *grow), (__u64)LUSTRE_DATA_SEQ_MAX_WIDTH);
400         *grow = end - fid->f_oid;
401         fid->f_oid += end - fid->f_oid;
402         spin_unlock(&osp->opd_pre_lock);
403
404         CDEBUG(D_INFO, "Expect %d, actual %d ["DFID" -- "DFID"]\n",
405                *grow, i, PFID(fid), PFID(&osp->opd_pre_last_created_fid));
406
407         return *grow > 0 ? 0 : 1;
408 }
409
410 static int osp_precreate_send(const struct lu_env *env, struct osp_device *d)
411 {
412         struct osp_thread_info  *oti = osp_env_info(env);
413         struct ptlrpc_request   *req;
414         struct obd_import       *imp;
415         struct ost_body         *body;
416         int                      rc, grow, diff;
417         struct lu_fid           *fid = &oti->osi_fid;
418         ENTRY;
419
420         /* don't precreate new objects till OST healthy and has free space */
421         if (unlikely(d->opd_pre_status)) {
422                 CDEBUG(D_INFO, "%s: don't send new precreate: rc = %d\n",
423                        d->opd_obd->obd_name, d->opd_pre_status);
424                 RETURN(0);
425         }
426
427         /*
428          * if not connection/initialization is compeleted, ignore
429          */
430         imp = d->opd_obd->u.cli.cl_import;
431         LASSERT(imp);
432
433         req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
434         if (req == NULL)
435                 RETURN(-ENOMEM);
436         req->rq_request_portal = OST_CREATE_PORTAL;
437         /* we should not resend create request - anyway we will have delorphan
438          * and kill these objects */
439         req->rq_no_delay = req->rq_no_resend = 1;
440
441         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
442         if (rc) {
443                 ptlrpc_request_free(req);
444                 RETURN(rc);
445         }
446
447         spin_lock(&d->opd_pre_lock);
448         if (d->opd_pre_grow_count > d->opd_pre_max_grow_count / 2)
449                 d->opd_pre_grow_count = d->opd_pre_max_grow_count / 2;
450         grow = d->opd_pre_grow_count;
451         spin_unlock(&d->opd_pre_lock);
452
453         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
454         LASSERT(body);
455
456         *fid = d->opd_pre_last_created_fid;
457         rc = osp_precreate_fids(env, d, fid, &grow);
458         if (rc == 1) {
459                 /* Current seq has been used up*/
460                 if (!osp_is_fid_client(d)) {
461                         osp_pre_update_status(d, -ENOSPC);
462                         rc = -ENOSPC;
463                 }
464                 wake_up(&d->opd_pre_waitq);
465                 GOTO(out_req, rc);
466         }
467
468         if (!osp_is_fid_client(d)) {
469                 /* Non-FID client will always send seq 0 because of
470                  * compatiblity */
471                 LASSERTF(fid_is_idif(fid), "Invalid fid "DFID"\n", PFID(fid));
472                 fid->f_seq = 0;
473         }
474
475         fid_to_ostid(fid, &body->oa.o_oi);
476         body->oa.o_valid = OBD_MD_FLGROUP;
477
478         ptlrpc_request_set_replen(req);
479
480         rc = ptlrpc_queue_wait(req);
481         if (rc) {
482                 CERROR("%s: can't precreate: rc = %d\n", d->opd_obd->obd_name,
483                        rc);
484                 GOTO(out_req, rc);
485         }
486         LASSERT(req->rq_transno == 0);
487
488         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
489         if (body == NULL)
490                 GOTO(out_req, rc = -EPROTO);
491
492         ostid_to_fid(fid, &body->oa.o_oi, d->opd_index);
493         LASSERTF(lu_fid_diff(fid, &d->opd_pre_used_fid) > 0,
494                  "reply fid "DFID" pre used fid "DFID"\n", PFID(fid),
495                  PFID(&d->opd_pre_used_fid));
496
497         diff = lu_fid_diff(fid, &d->opd_pre_last_created_fid);
498
499         spin_lock(&d->opd_pre_lock);
500         if (diff < grow) {
501                 /* the OST has not managed to create all the
502                  * objects we asked for */
503                 d->opd_pre_grow_count = max(diff, OST_MIN_PRECREATE);
504                 d->opd_pre_grow_slow = 1;
505         } else {
506                 /* the OST is able to keep up with the work,
507                  * we could consider increasing grow_count
508                  * next time if needed */
509                 d->opd_pre_grow_slow = 0;
510         }
511
512         d->opd_pre_last_created_fid = *fid;
513         spin_unlock(&d->opd_pre_lock);
514
515         CDEBUG(D_HA, "%s: current precreated pool: "DFID"-"DFID"\n",
516                d->opd_obd->obd_name, PFID(&d->opd_pre_used_fid),
517                PFID(&d->opd_pre_last_created_fid));
518 out_req:
519         /* now we can wakeup all users awaiting for objects */
520         osp_pre_update_status(d, rc);
521         wake_up(&d->opd_pre_user_waitq);
522
523         ptlrpc_req_finished(req);
524         RETURN(rc);
525 }
526
527 static int osp_get_lastfid_from_ost(const struct lu_env *env,
528                                     struct osp_device *d)
529 {
530         struct ptlrpc_request   *req = NULL;
531         struct obd_import       *imp;
532         struct lu_fid           *last_fid;
533         char                    *tmp;
534         int                     rc;
535         ENTRY;
536
537         imp = d->opd_obd->u.cli.cl_import;
538         LASSERT(imp);
539
540         req = ptlrpc_request_alloc(imp, &RQF_OST_GET_INFO_LAST_FID);
541         if (req == NULL)
542                 RETURN(-ENOMEM);
543
544         req_capsule_set_size(&req->rq_pill, &RMF_GETINFO_KEY, RCL_CLIENT,
545                              sizeof(KEY_LAST_FID));
546
547         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
548         if (rc) {
549                 ptlrpc_request_free(req);
550                 RETURN(rc);
551         }
552
553         tmp = req_capsule_client_get(&req->rq_pill, &RMF_GETINFO_KEY);
554         memcpy(tmp, KEY_LAST_FID, sizeof(KEY_LAST_FID));
555
556         req->rq_no_delay = req->rq_no_resend = 1;
557         last_fid = req_capsule_client_get(&req->rq_pill, &RMF_FID);
558         fid_cpu_to_le(last_fid, &d->opd_last_used_fid);
559
560         ptlrpc_request_set_replen(req);
561
562         rc = ptlrpc_queue_wait(req);
563         if (rc) {
564                 /* bad-bad OST.. let sysadm sort this out */
565                 if (rc == -ENOTSUPP) {
566                         CERROR("%s: server does not support FID: rc = %d\n",
567                                d->opd_obd->obd_name, -ENOTSUPP);
568                 }
569                 ptlrpc_set_import_active(imp, 0);
570                 GOTO(out, rc);
571         }
572
573         last_fid = req_capsule_server_get(&req->rq_pill, &RMF_FID);
574         if (last_fid == NULL) {
575                 CERROR("%s: Got last_fid failed.\n", d->opd_obd->obd_name);
576                 GOTO(out, rc = -EPROTO);
577         }
578
579         if (!fid_is_sane(last_fid)) {
580                 CERROR("%s: Got insane last_fid "DFID"\n",
581                        d->opd_obd->obd_name, PFID(last_fid));
582                 GOTO(out, rc = -EPROTO);
583         }
584
585         /* Only update the last used fid, if the OST has objects for
586          * this sequence, i.e. fid_oid > 0 */
587         if (fid_oid(last_fid) > 0)
588                 d->opd_last_used_fid = *last_fid;
589
590         CDEBUG(D_HA, "%s: Got last_fid "DFID"\n", d->opd_obd->obd_name,
591                PFID(last_fid));
592
593 out:
594         ptlrpc_req_finished(req);
595         RETURN(rc);
596 }
597
598 /**
599  * asks OST to clean precreate orphans
600  * and gets next id for new objects
601  */
602 static int osp_precreate_cleanup_orphans(struct lu_env *env,
603                                          struct osp_device *d)
604 {
605         struct osp_thread_info  *osi = osp_env_info(env);
606         struct lu_fid           *last_fid = &osi->osi_fid;
607         struct ptlrpc_request   *req = NULL;
608         struct obd_import       *imp;
609         struct ost_body         *body;
610         struct l_wait_info       lwi = { 0 };
611         int                      update_status = 0;
612         int                      rc;
613         int                      diff;
614
615         ENTRY;
616
617         /*
618          * wait for local recovery to finish, so we can cleanup orphans
619          * orphans are all objects since "last used" (assigned), but
620          * there might be objects reserved and in some cases they won't
621          * be used. we can't cleanup them till we're sure they won't be
622          * used. also can't we allow new reservations because they may
623          * end up getting orphans being cleaned up below. so we block
624          * new reservations and wait till all reserved objects either
625          * user or released.
626          */
627         spin_lock(&d->opd_pre_lock);
628         d->opd_pre_recovering = 1;
629         spin_unlock(&d->opd_pre_lock);
630         /*
631          * The locking above makes sure the opd_pre_reserved check below will
632          * catch all osp_precreate_reserve() calls who find
633          * "!opd_pre_recovering".
634          */
635         l_wait_event(d->opd_pre_waitq,
636                      (!d->opd_pre_reserved && d->opd_recovery_completed) ||
637                      !osp_precreate_running(d) || d->opd_got_disconnected,
638                      &lwi);
639         if (!osp_precreate_running(d) || d->opd_got_disconnected)
640                 GOTO(out, rc = -EAGAIN);
641
642         CDEBUG(D_HA, "%s: going to cleanup orphans since "DFID"\n",
643                d->opd_obd->obd_name, PFID(&d->opd_last_used_fid));
644
645         *last_fid = d->opd_last_used_fid;
646         /* The OSP should already get the valid seq now */
647         LASSERT(!fid_is_zero(last_fid));
648         if (fid_oid(&d->opd_last_used_fid) < 2) {
649                 /* lastfid looks strange... ask OST */
650                 rc = osp_get_lastfid_from_ost(env, d);
651                 if (rc)
652                         GOTO(out, rc);
653         }
654
655         imp = d->opd_obd->u.cli.cl_import;
656         LASSERT(imp);
657
658         req = ptlrpc_request_alloc(imp, &RQF_OST_CREATE);
659         if (req == NULL)
660                 GOTO(out, rc = -ENOMEM);
661
662         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
663         if (rc) {
664                 ptlrpc_request_free(req);
665                 req = NULL;
666                 GOTO(out, rc);
667         }
668
669         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
670         if (body == NULL)
671                 GOTO(out, rc = -EPROTO);
672
673         body->oa.o_flags = OBD_FL_DELORPHAN;
674         body->oa.o_valid = OBD_MD_FLFLAGS | OBD_MD_FLGROUP;
675
676         fid_to_ostid(&d->opd_last_used_fid, &body->oa.o_oi);
677
678         ptlrpc_request_set_replen(req);
679
680         /* Don't resend the delorphan req */
681         req->rq_no_resend = req->rq_no_delay = 1;
682
683         rc = ptlrpc_queue_wait(req);
684         if (rc) {
685                 update_status = 1;
686                 GOTO(out, rc);
687         }
688
689         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
690         if (body == NULL)
691                 GOTO(out, rc = -EPROTO);
692
693         /*
694          * OST provides us with id new pool starts from in body->oa.o_id
695          */
696         ostid_to_fid(last_fid, &body->oa.o_oi, d->opd_index);
697
698         spin_lock(&d->opd_pre_lock);
699         diff = lu_fid_diff(&d->opd_last_used_fid, last_fid);
700         if (diff > 0) {
701                 d->opd_pre_grow_count = OST_MIN_PRECREATE + diff;
702                 d->opd_pre_last_created_fid = d->opd_last_used_fid;
703         } else {
704                 d->opd_pre_grow_count = OST_MIN_PRECREATE;
705                 d->opd_pre_last_created_fid = *last_fid;
706         }
707         /*
708          * This empties the pre-creation pool and effectively blocks any new
709          * reservations.
710          */
711         LASSERT(fid_oid(&d->opd_pre_last_created_fid) <=
712                 LUSTRE_DATA_SEQ_MAX_WIDTH);
713         d->opd_pre_used_fid = d->opd_pre_last_created_fid;
714         d->opd_pre_grow_slow = 0;
715         spin_unlock(&d->opd_pre_lock);
716
717         CDEBUG(D_HA, "%s: Got last_id "DFID" from OST, last_created "DFID
718                "last_used is "DFID"\n", d->opd_obd->obd_name, PFID(last_fid),
719                PFID(&d->opd_pre_last_created_fid), PFID(&d->opd_last_used_fid));
720 out:
721         if (req)
722                 ptlrpc_req_finished(req);
723
724         spin_lock(&d->opd_pre_lock);
725         d->opd_pre_recovering = 0;
726         spin_unlock(&d->opd_pre_lock);
727
728         /*
729          * If rc is zero, the pre-creation window should have been emptied.
730          * Since waking up the herd would be useless without pre-created
731          * objects, we defer the signal to osp_precreate_send() in that case.
732          */
733         if (rc != 0) {
734                 if (update_status) {
735                         CERROR("%s: cannot cleanup orphans: rc = %d\n",
736                                d->opd_obd->obd_name, rc);
737                         /* we can't proceed from here, OST seem to
738                          * be in a bad shape, better to wait for
739                          * a new instance of the server and repeat
740                          * from the beginning. notify possible waiters
741                          * this OSP isn't quite functional yet */
742                         osp_pre_update_status(d, rc);
743                 } else {
744                         wake_up(&d->opd_pre_user_waitq);
745                 }
746         }
747
748         RETURN(rc);
749 }
750
751 /*
752  * the function updates current precreation status used: functional or not
753  *
754  * rc is a last code from the transport, rc == 0 meaning transport works
755  * well and users of lod can use objects from this OSP
756  *
757  * the status depends on current usage of OST
758  */
759 void osp_pre_update_status(struct osp_device *d, int rc)
760 {
761         struct obd_statfs       *msfs = &d->opd_statfs;
762         int                      old = d->opd_pre_status;
763         __u64                    used;
764
765         d->opd_pre_status = rc;
766         if (rc)
767                 goto out;
768
769         /* Add a bit of hysteresis so this flag isn't continually flapping,
770          * and ensure that new files don't get extremely fragmented due to
771          * only a small amount of available space in the filesystem.
772          * We want to set the NOSPC flag when there is less than ~0.1% free
773          * and clear it when there is at least ~0.2% free space, so:
774          *                   avail < ~0.1% max          max = avail + used
775          *            1025 * avail < avail + used       used = blocks - free
776          *            1024 * avail < used
777          *            1024 * avail < blocks - free
778          *                   avail < ((blocks - free) >> 10)
779          *
780          * On very large disk, say 16TB 0.1% will be 16 GB. We don't want to
781          * lose that amount of space so in those cases we report no space left
782          * if their is less than 1 GB left.                             */
783         if (likely(msfs->os_type)) {
784                 used = min_t(__u64, (msfs->os_blocks - msfs->os_bfree) >> 10,
785                                     1 << 30);
786                 if ((msfs->os_ffree < 32) || (msfs->os_bavail < used)) {
787                         d->opd_pre_status = -ENOSPC;
788                         if (old != -ENOSPC)
789                                 CDEBUG(D_INFO, "%s: status: "LPU64" blocks, "
790                                        LPU64" free, "LPU64" used, "LPU64" "
791                                        "avail -> %d: rc = %d\n",
792                                        d->opd_obd->obd_name, msfs->os_blocks,
793                                        msfs->os_bfree, used, msfs->os_bavail,
794                                        d->opd_pre_status, rc);
795                         CDEBUG(D_INFO,
796                                "non-commited changes: %lu, in progress: %u\n",
797                                d->opd_syn_changes, d->opd_syn_rpc_in_progress);
798                 } else if (old == -ENOSPC) {
799                         d->opd_pre_status = 0;
800                         spin_lock(&d->opd_pre_lock);
801                         d->opd_pre_grow_slow = 0;
802                         d->opd_pre_grow_count = OST_MIN_PRECREATE;
803                         spin_unlock(&d->opd_pre_lock);
804                         wake_up(&d->opd_pre_waitq);
805                         CDEBUG(D_INFO, "%s: no space: "LPU64" blocks, "LPU64
806                                " free, "LPU64" used, "LPU64" avail -> %d: "
807                                "rc = %d\n", d->opd_obd->obd_name,
808                                msfs->os_blocks, msfs->os_bfree, used,
809                                msfs->os_bavail, d->opd_pre_status, rc);
810                 }
811         }
812
813 out:
814         wake_up(&d->opd_pre_user_waitq);
815 }
816
817 int osp_init_pre_fid(struct osp_device *osp)
818 {
819         struct lu_env           env;
820         struct osp_thread_info  *osi;
821         struct lu_client_seq    *cli_seq;
822         struct lu_fid           *last_fid;
823         int                     rc;
824         ENTRY;
825
826         LASSERT(osp->opd_pre != NULL);
827
828         /* Return if last_used fid has been initialized */
829         if (!fid_is_zero(&osp->opd_last_used_fid))
830                 RETURN(0);
831
832         rc = lu_env_init(&env, osp->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
833         if (rc) {
834                 CERROR("%s: init env error: rc = %d\n",
835                        osp->opd_obd->obd_name, rc);
836                 RETURN(rc);
837         }
838
839         osi = osp_env_info(&env);
840         last_fid = &osi->osi_fid;
841         fid_zero(last_fid);
842         /* For a freshed fs, it will allocate a new sequence first */
843         if (osp_is_fid_client(osp) && osp->opd_group != 0) {
844                 cli_seq = osp->opd_obd->u.cli.cl_seq;
845                 rc = seq_client_get_seq(&env, cli_seq, &last_fid->f_seq);
846                 if (rc != 0) {
847                         CERROR("%s: alloc fid error: rc = %d\n",
848                                osp->opd_obd->obd_name, rc);
849                         GOTO(out, rc);
850                 }
851         } else {
852                 last_fid->f_seq = fid_idif_seq(0, osp->opd_index);
853         }
854         last_fid->f_oid = 1;
855         last_fid->f_ver = 0;
856
857         spin_lock(&osp->opd_pre_lock);
858         osp->opd_last_used_fid = *last_fid;
859         osp->opd_pre_used_fid = *last_fid;
860         osp->opd_pre_last_created_fid = *last_fid;
861         spin_unlock(&osp->opd_pre_lock);
862         rc = osp_write_last_oid_seq_files(&env, osp, last_fid, 1);
863         if (rc != 0) {
864                 CERROR("%s: write fid error: rc = %d\n",
865                        osp->opd_obd->obd_name, rc);
866                 GOTO(out, rc);
867         }
868 out:
869         lu_env_fini(&env);
870         RETURN(rc);
871 }
872
873 static int osp_precreate_thread(void *_arg)
874 {
875         struct osp_device       *d = _arg;
876         struct ptlrpc_thread    *thread = &d->opd_pre_thread;
877         struct l_wait_info       lwi = { 0 };
878         struct lu_env            env;
879         int                      rc;
880
881         ENTRY;
882
883         rc = lu_env_init(&env, d->opd_dt_dev.dd_lu_dev.ld_type->ldt_ctx_tags);
884         if (rc) {
885                 CERROR("%s: init env error: rc = %d\n", d->opd_obd->obd_name,
886                        rc);
887                 RETURN(rc);
888         }
889
890         spin_lock(&d->opd_pre_lock);
891         thread->t_flags = SVC_RUNNING;
892         spin_unlock(&d->opd_pre_lock);
893         wake_up(&thread->t_ctl_waitq);
894
895         while (osp_precreate_running(d)) {
896                 /*
897                  * need to be connected to OST
898                  */
899                 while (osp_precreate_running(d)) {
900                         l_wait_event(d->opd_pre_waitq,
901                                      !osp_precreate_running(d) ||
902                                      d->opd_new_connection,
903                                      &lwi);
904
905                         if (!d->opd_new_connection)
906                                 continue;
907
908                         d->opd_new_connection = 0;
909                         d->opd_got_disconnected = 0;
910                         break;
911                 }
912
913                 if (!osp_precreate_running(d))
914                         break;
915
916                 LASSERT(d->opd_obd->u.cli.cl_seq != NULL);
917                 /* Sigh, fid client is not ready yet */
918                 if (d->opd_obd->u.cli.cl_seq->lcs_exp == NULL)
919                         continue;
920
921                 /* Init fid for osp_precreate if necessary */
922                 rc = osp_init_pre_fid(d);
923                 if (rc != 0) {
924                         class_export_put(d->opd_exp);
925                         d->opd_obd->u.cli.cl_seq->lcs_exp = NULL;
926                         CERROR("%s: init pre fid error: rc = %d\n",
927                                d->opd_obd->obd_name, rc);
928                         continue;
929                 }
930
931                 osp_statfs_update(d);
932
933                 /*
934                  * Clean up orphans or recreate missing objects.
935                  */
936                 rc = osp_precreate_cleanup_orphans(&env, d);
937                 if (rc != 0)
938                         continue;
939                 /*
940                  * connected, can handle precreates now
941                  */
942                 while (osp_precreate_running(d)) {
943                         l_wait_event(d->opd_pre_waitq,
944                                      !osp_precreate_running(d) ||
945                                      osp_precreate_near_empty(&env, d) ||
946                                      osp_statfs_need_update(d) ||
947                                      d->opd_got_disconnected, &lwi);
948
949                         if (!osp_precreate_running(d))
950                                 break;
951
952                         /* something happened to the connection
953                          * have to start from the beginning */
954                         if (d->opd_got_disconnected)
955                                 break;
956
957                         if (osp_statfs_need_update(d))
958                                 osp_statfs_update(d);
959
960                         /* To avoid handling different seq in precreate/orphan
961                          * cleanup, it will hold precreate until current seq is
962                          * used up. */
963                         if (unlikely(osp_precreate_end_seq(&env, d) &&
964                             !osp_create_end_seq(&env, d)))
965                                 continue;
966
967                         if (unlikely(osp_precreate_end_seq(&env, d) &&
968                                      osp_create_end_seq(&env, d))) {
969                                 LCONSOLE_INFO("%s:"LPX64" is used up."
970                                               " Update to new seq\n",
971                                               d->opd_obd->obd_name,
972                                          fid_seq(&d->opd_pre_last_created_fid));
973                                 rc = osp_precreate_rollover_new_seq(&env, d);
974                                 if (rc)
975                                         continue;
976                         }
977
978                         if (osp_precreate_near_empty(&env, d)) {
979                                 rc = osp_precreate_send(&env, d);
980                                 /* osp_precreate_send() sets opd_pre_status
981                                  * in case of error, that prevent the using of
982                                  * failed device. */
983                                 if (rc < 0 && rc != -ENOSPC &&
984                                     rc != -ETIMEDOUT && rc != -ENOTCONN)
985                                         CERROR("%s: cannot precreate objects:"
986                                                " rc = %d\n",
987                                                d->opd_obd->obd_name, rc);
988                         }
989                 }
990         }
991
992         thread->t_flags = SVC_STOPPED;
993         lu_env_fini(&env);
994         wake_up(&thread->t_ctl_waitq);
995
996         RETURN(0);
997 }
998
999 static int osp_precreate_ready_condition(const struct lu_env *env,
1000                                          struct osp_device *d)
1001 {
1002         if (d->opd_pre_recovering)
1003                 return 0;
1004
1005         /* ready if got enough precreated objects */
1006         /* we need to wait for others (opd_pre_reserved) and our object (+1) */
1007         if (d->opd_pre_reserved + 1 < osp_objs_precreated(env, d))
1008                 return 1;
1009
1010         /* ready if OST reported no space and no destroys in progress */
1011         if (d->opd_syn_changes + d->opd_syn_rpc_in_progress == 0 &&
1012             d->opd_pre_status == -ENOSPC)
1013                 return 1;
1014
1015         /* Bail out I/O fails to OST */
1016         if (d->opd_pre_status != 0 &&
1017             d->opd_pre_status != -EAGAIN &&
1018             d->opd_pre_status != -ENODEV &&
1019             d->opd_pre_status != -ENOSPC) {
1020                 /* DEBUG LU-3230 */
1021                 if (d->opd_pre_status != -EIO)
1022                         CERROR("%s: precreate failed opd_pre_status %d\n",
1023                                d->opd_obd->obd_name, d->opd_pre_status);
1024                 return 1;
1025         }
1026
1027         return 0;
1028 }
1029
1030 static int osp_precreate_timeout_condition(void *data)
1031 {
1032         struct osp_device *d = data;
1033
1034         LCONSOLE_WARN("%s: slow creates, last="DFID", next="DFID", "
1035                       "reserved="LPU64", syn_changes=%lu, "
1036                       "syn_rpc_in_progress=%d, status=%d\n",
1037                       d->opd_obd->obd_name, PFID(&d->opd_pre_last_created_fid),
1038                       PFID(&d->opd_pre_used_fid), d->opd_pre_reserved,
1039                       d->opd_syn_changes, d->opd_syn_rpc_in_progress,
1040                       d->opd_pre_status);
1041
1042         return 1;
1043 }
1044
1045 /*
1046  * called to reserve object in the pool
1047  * return codes:
1048  *  ENOSPC - no space on corresponded OST
1049  *  EAGAIN - precreation is in progress, try later
1050  *  EIO    - no access to OST
1051  */
1052 int osp_precreate_reserve(const struct lu_env *env, struct osp_device *d)
1053 {
1054         struct l_wait_info       lwi;
1055         cfs_time_t               expire = cfs_time_shift(obd_timeout);
1056         int                      precreated, rc;
1057
1058         ENTRY;
1059
1060         LASSERTF(osp_objs_precreated(env, d) >= 0, "Last created FID "DFID
1061                  "Next FID "DFID"\n", PFID(&d->opd_pre_last_created_fid),
1062                  PFID(&d->opd_pre_used_fid));
1063
1064         /*
1065          * wait till:
1066          *  - preallocation is done
1067          *  - no free space expected soon
1068          *  - can't connect to OST for too long (obd_timeout)
1069          *  - OST can allocate fid sequence.
1070          */
1071         while ((rc = d->opd_pre_status) == 0 || rc == -ENOSPC ||
1072                 rc == -ENODEV || rc == -EAGAIN || rc == -ENOTCONN) {
1073
1074                 /*
1075                  * increase number of precreations
1076                  */
1077                 precreated = osp_objs_precreated(env, d);
1078                 if (d->opd_pre_grow_count < d->opd_pre_max_grow_count &&
1079                     d->opd_pre_grow_slow == 0 &&
1080                     precreated <= (d->opd_pre_grow_count / 4 + 1)) {
1081                         spin_lock(&d->opd_pre_lock);
1082                         d->opd_pre_grow_slow = 1;
1083                         d->opd_pre_grow_count *= 2;
1084                         spin_unlock(&d->opd_pre_lock);
1085                 }
1086
1087                 spin_lock(&d->opd_pre_lock);
1088                 precreated = osp_objs_precreated(env, d);
1089                 if (precreated > d->opd_pre_reserved &&
1090                     !d->opd_pre_recovering) {
1091                         d->opd_pre_reserved++;
1092                         spin_unlock(&d->opd_pre_lock);
1093                         rc = 0;
1094
1095                         /* XXX: don't wake up if precreation is in progress */
1096                         if (osp_precreate_near_empty_nolock(env, d) &&
1097                            !osp_precreate_end_seq_nolock(env, d))
1098                                 wake_up(&d->opd_pre_waitq);
1099
1100                         break;
1101                 }
1102                 spin_unlock(&d->opd_pre_lock);
1103
1104                 /*
1105                  * all precreated objects have been used and no-space
1106                  * status leave us no chance to succeed very soon
1107                  * but if there is destroy in progress, then we should
1108                  * wait till that is done - some space might be released
1109                  */
1110                 if (unlikely(rc == -ENOSPC)) {
1111                         if (d->opd_syn_changes) {
1112                                 /* force local commit to release space */
1113                                 dt_commit_async(env, d->opd_storage);
1114                         }
1115                         if (d->opd_syn_rpc_in_progress) {
1116                                 /* just wait till destroys are done */
1117                                 /* see l_wait_even() few lines below */
1118                         }
1119                         if (d->opd_syn_changes +
1120                             d->opd_syn_rpc_in_progress == 0) {
1121                                 /* no hope for free space */
1122                                 break;
1123                         }
1124                 }
1125
1126                 /* XXX: don't wake up if precreation is in progress */
1127                 wake_up(&d->opd_pre_waitq);
1128
1129                 lwi = LWI_TIMEOUT(expire - cfs_time_current(),
1130                                 osp_precreate_timeout_condition, d);
1131                 if (cfs_time_aftereq(cfs_time_current(), expire)) {
1132                         rc = -ETIMEDOUT;
1133                         break;
1134                 }
1135
1136                 l_wait_event(d->opd_pre_user_waitq,
1137                              osp_precreate_ready_condition(env, d), &lwi);
1138         }
1139
1140         RETURN(rc);
1141 }
1142
1143 /*
1144  * this function relies on reservation made before
1145  */
1146 int osp_precreate_get_fid(const struct lu_env *env, struct osp_device *d,
1147                           struct lu_fid *fid)
1148 {
1149         /* grab next id from the pool */
1150         spin_lock(&d->opd_pre_lock);
1151
1152         LASSERTF(lu_fid_diff(&d->opd_pre_used_fid,
1153                              &d->opd_pre_last_created_fid) < 0,
1154                  "next fid "DFID" last created fid "DFID"\n",
1155                  PFID(&d->opd_pre_used_fid),
1156                  PFID(&d->opd_pre_last_created_fid));
1157
1158         d->opd_pre_used_fid.f_oid++;
1159         memcpy(fid, &d->opd_pre_used_fid, sizeof(*fid));
1160         d->opd_pre_reserved--;
1161         /*
1162          * last_used_id must be changed along with getting new id otherwise
1163          * we might miscalculate gap causing object loss or leak
1164          */
1165         osp_update_last_fid(d, fid);
1166         spin_unlock(&d->opd_pre_lock);
1167
1168         /*
1169          * probably main thread suspended orphan cleanup till
1170          * all reservations are released, see comment in
1171          * osp_precreate_thread() just before orphan cleanup
1172          */
1173         if (unlikely(d->opd_pre_reserved == 0 && d->opd_pre_status))
1174                 wake_up(&d->opd_pre_waitq);
1175
1176         return 0;
1177 }
1178
1179 /*
1180  *
1181  */
1182 int osp_object_truncate(const struct lu_env *env, struct dt_object *dt,
1183                         __u64 size)
1184 {
1185         struct osp_device       *d = lu2osp_dev(dt->do_lu.lo_dev);
1186         struct ptlrpc_request   *req = NULL;
1187         struct obd_import       *imp;
1188         struct ost_body         *body;
1189         struct obdo             *oa = NULL;
1190         int                      rc;
1191
1192         ENTRY;
1193
1194         imp = d->opd_obd->u.cli.cl_import;
1195         LASSERT(imp);
1196
1197         req = ptlrpc_request_alloc(imp, &RQF_OST_PUNCH);
1198         if (req == NULL)
1199                 RETURN(-ENOMEM);
1200
1201         /* XXX: capa support? */
1202         /* osc_set_capa_size(req, &RMF_CAPA1, capa); */
1203         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
1204         if (rc) {
1205                 ptlrpc_request_free(req);
1206                 RETURN(rc);
1207         }
1208
1209         /*
1210          * XXX: decide how do we do here with resend
1211          * if we don't resend, then client may see wrong file size
1212          * if we do resend, then MDS thread can get stuck for quite long
1213          */
1214         req->rq_no_resend = req->rq_no_delay = 1;
1215
1216         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1217         ptlrpc_at_set_req_timeout(req);
1218
1219         OBD_ALLOC_PTR(oa);
1220         if (oa == NULL)
1221                 GOTO(out, rc = -ENOMEM);
1222
1223         rc = fid_to_ostid(lu_object_fid(&dt->do_lu), &oa->o_oi);
1224         LASSERT(rc == 0);
1225         oa->o_size = size;
1226         oa->o_blocks = OBD_OBJECT_EOF;
1227         oa->o_valid = OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
1228                       OBD_MD_FLID | OBD_MD_FLGROUP;
1229
1230         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1231         LASSERT(body);
1232         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1233
1234         /* XXX: capa support? */
1235         /* osc_pack_capa(req, body, capa); */
1236
1237         ptlrpc_request_set_replen(req);
1238
1239         rc = ptlrpc_queue_wait(req);
1240         if (rc)
1241                 CERROR("can't punch object: %d\n", rc);
1242 out:
1243         ptlrpc_req_finished(req);
1244         if (oa)
1245                 OBD_FREE_PTR(oa);
1246         RETURN(rc);
1247 }
1248
1249 int osp_init_precreate(struct osp_device *d)
1250 {
1251         struct l_wait_info       lwi = { 0 };
1252         struct task_struct              *task;
1253
1254         ENTRY;
1255
1256         OBD_ALLOC_PTR(d->opd_pre);
1257         if (d->opd_pre == NULL)
1258                 RETURN(-ENOMEM);
1259
1260         /* initially precreation isn't ready */
1261         d->opd_pre_status = -EAGAIN;
1262         fid_zero(&d->opd_pre_used_fid);
1263         d->opd_pre_used_fid.f_oid = 1;
1264         fid_zero(&d->opd_pre_last_created_fid);
1265         d->opd_pre_last_created_fid.f_oid = 1;
1266         d->opd_pre_reserved = 0;
1267         d->opd_got_disconnected = 1;
1268         d->opd_pre_grow_slow = 0;
1269         d->opd_pre_grow_count = OST_MIN_PRECREATE;
1270         d->opd_pre_min_grow_count = OST_MIN_PRECREATE;
1271         d->opd_pre_max_grow_count = OST_MAX_PRECREATE;
1272
1273         spin_lock_init(&d->opd_pre_lock);
1274         init_waitqueue_head(&d->opd_pre_waitq);
1275         init_waitqueue_head(&d->opd_pre_user_waitq);
1276         init_waitqueue_head(&d->opd_pre_thread.t_ctl_waitq);
1277
1278         /*
1279          * Initialize statfs-related things
1280          */
1281         d->opd_statfs_maxage = 5; /* default update interval */
1282         d->opd_statfs_fresh_till = cfs_time_shift(-1000);
1283         CDEBUG(D_OTHER, "current %llu, fresh till %llu\n",
1284                (unsigned long long)cfs_time_current(),
1285                (unsigned long long)d->opd_statfs_fresh_till);
1286         cfs_timer_init(&d->opd_statfs_timer, osp_statfs_timer_cb, d);
1287
1288         /*
1289          * start thread handling precreation and statfs updates
1290          */
1291         task = kthread_run(osp_precreate_thread, d,
1292                                "osp-pre-%u", d->opd_index);
1293         if (IS_ERR(task)) {
1294                 CERROR("can't start precreate thread %ld\n", PTR_ERR(task));
1295                 RETURN(PTR_ERR(task));
1296         }
1297
1298         l_wait_event(d->opd_pre_thread.t_ctl_waitq,
1299                      osp_precreate_running(d) || osp_precreate_stopped(d),
1300                      &lwi);
1301
1302         RETURN(0);
1303 }
1304
1305 void osp_precreate_fini(struct osp_device *d)
1306 {
1307         struct ptlrpc_thread *thread;
1308
1309         ENTRY;
1310
1311         cfs_timer_disarm(&d->opd_statfs_timer);
1312
1313         if (d->opd_pre == NULL)
1314                 RETURN_EXIT;
1315
1316         thread = &d->opd_pre_thread;
1317
1318         thread->t_flags = SVC_STOPPING;
1319         wake_up(&d->opd_pre_waitq);
1320
1321         wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
1322
1323         OBD_FREE_PTR(d->opd_pre);
1324         d->opd_pre = NULL;
1325
1326         EXIT;
1327 }
1328