Whamcloud - gitweb
fed62021f51852702d001cae3285a5c20ed7bd47
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osp/osp_sync.c
33  *
34  * Lustre OST Proxy Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39
40 #define DEBUG_SUBSYSTEM S_MDS
41
42 #include <linux/kthread.h>
43 #include <lustre_log.h>
44 #include <lustre_update.h>
45 #include "osp_internal.h"
46
47 static int osp_sync_id_traction_init(struct osp_device *d);
48 static void osp_sync_id_traction_fini(struct osp_device *d);
49 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id);
50 static void osp_sync_remove_from_tracker(struct osp_device *d);
51
52 /*
53  * this is a components of OSP implementing synchronization between MDS and OST
54  * it llogs all interesting changes (currently it's uig/gid change and object
55  * destroy) atomically, then makes sure changes hit OST storage
56  *
57  * we have 4 queues of work:
58  *
59  * the first queue is llog itself, once read a change is stored in 2nd queue
60  * in form of RPC (but RPC isn't fired yet).
61  *
62  * the second queue (opd_sync_waiting_for_commit) holds changes awaiting local
63  * commit. once change is committed locally it migrates onto 3rd queue.
64  *
65  * the third queue (opd_sync_committed_here) holds changes committed locally,
66  * but not sent to OST (as the pipe can be full). once pipe becomes non-full
67  * we take a change from the queue and fire corresponded RPC.
68  *
69  * once RPC is reported committed by OST (using regular last_committed mech.)
70  * the change jumps into 4th queue (opd_sync_committed_there), now we can
71  * cancel corresponded llog record and release RPC
72  *
73  * opd_sync_changes is a number of unread llog records (to be processed).
74  * notice this number doesn't include llog records from previous boots.
75  * with OSP_SYNC_THRESHOLD we try to batch processing a bit (TO BE IMPLEMENTED)
76  *
77  * opd_sync_rpcs_in_progress is total number of requests in above 2-4 queues.
78  * we control this with OSP_MAX_RPCS_IN_PROGRESS so that OSP don't consume
79  * too much memory -- how to deal with 1000th OSTs ? batching could help?
80  *
81  * opd_sync_rpcs_in_flight is a number of RPC in flight.
82  * we control this with OSP_MAX_RPCS_IN_FLIGHT
83  */
84
85 /* XXX: do math to learn reasonable threshold
86  * should it be ~ number of changes fitting bulk? */
87
88 #define OSP_SYNC_THRESHOLD              10
89 #define OSP_MAX_RPCS_IN_FLIGHT          8
90 #define OSP_MAX_RPCS_IN_PROGRESS        4096
91
92 #define OSP_JOB_MAGIC           0x26112005
93
94 struct osp_job_req_args {
95         /** bytes reserved for ptlrpc_replay_req() */
96         struct ptlrpc_replay_async_args jra_raa;
97         struct list_head                jra_committed_link;
98         struct list_head                jra_in_flight_link;
99         struct llog_cookie              jra_lcookie;
100         __u32                           jra_magic;
101 };
102
103 static inline int osp_sync_running(struct osp_device *d)
104 {
105         return !!(d->opd_sync_thread.t_flags & SVC_RUNNING);
106 }
107
108 /**
109  * Check status: whether OSP thread has stopped
110  *
111  * \param[in] d         OSP device
112  *
113  * \retval 0            still running
114  * \retval 1            stopped
115  */
116 static inline int osp_sync_stopped(struct osp_device *d)
117 {
118         return !!(d->opd_sync_thread.t_flags & SVC_STOPPED);
119 }
120
121 /*
122  ** Check for new changes to sync
123  *
124  * \param[in] d         OSP device
125  *
126  * \retval 1            there are changes
127  * \retval 0            there are no changes
128  */
129 static inline int osp_sync_has_new_job(struct osp_device *d)
130 {
131         return ((d->opd_sync_last_processed_id < d->opd_sync_last_used_id) &&
132                 (d->opd_sync_last_processed_id < d->opd_sync_last_committed_id))
133                 || (d->opd_sync_prev_done == 0);
134 }
135
136 static inline int osp_sync_in_flight_conflict(struct osp_device *d,
137                                              struct llog_rec_hdr *h)
138 {
139         struct osp_job_req_args *jra;
140         struct ost_id            ostid;
141         int                      conflict = 0;
142
143         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
144             list_empty(&d->opd_sync_in_flight_list))
145                 return conflict;
146
147         memset(&ostid, 0, sizeof(ostid));
148         switch (h->lrh_type) {
149         case MDS_UNLINK_REC: {
150                 struct llog_unlink_rec *unlink = (struct llog_unlink_rec *)h;
151
152                 ostid_set_seq(&ostid, unlink->lur_oseq);
153                 if (ostid_set_id(&ostid, unlink->lur_oid)) {
154                         CERROR("Bad %llu to set " DOSTID "\n",
155                                (unsigned long long)(unlink->lur_oid),
156                                POSTID(&ostid));
157                         return 1;
158                 }
159                 }
160                 break;
161         case MDS_UNLINK64_REC:
162                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
163                 break;
164         case MDS_SETATTR64_REC:
165                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
166                 break;
167         default:
168                 LBUG();
169         }
170
171         spin_lock(&d->opd_sync_lock);
172         list_for_each_entry(jra, &d->opd_sync_in_flight_list,
173                             jra_in_flight_link) {
174                 struct ptlrpc_request   *req;
175                 struct ost_body         *body;
176
177                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
178
179                 req = container_of((void *)jra, struct ptlrpc_request,
180                                    rq_async_args);
181                 body = req_capsule_client_get(&req->rq_pill,
182                                               &RMF_OST_BODY);
183                 LASSERT(body);
184
185                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
186                         conflict = 1;
187                         break;
188                 }
189         }
190         spin_unlock(&d->opd_sync_lock);
191
192         return conflict;
193 }
194
195 static inline int osp_sync_rpcs_in_progress_low(struct osp_device *d)
196 {
197         return atomic_read(&d->opd_sync_rpcs_in_progress) <
198                 d->opd_sync_max_rpcs_in_progress;
199 }
200
201 /**
202  * Check for room in the network pipe to OST
203  *
204  * \param[in] d         OSP device
205  *
206  * \retval 1            there is room
207  * \retval 0            no room, the pipe is full
208  */
209 static inline int osp_sync_rpcs_in_flight_low(struct osp_device *d)
210 {
211         return atomic_read(&d->opd_sync_rpcs_in_flight) <
212                 d->opd_sync_max_rpcs_in_flight;
213 }
214
215 /**
216  * Wake up check for the main sync thread
217  *
218  * \param[in] d         OSP device
219  *
220  * \retval 1            time to wake up
221  * \retval 0            no need to wake up
222  */
223 static inline int osp_sync_has_work(struct osp_device *osp)
224 {
225         /* has new/old changes and low in-progress? */
226         if (osp_sync_has_new_job(osp) && osp_sync_rpcs_in_progress_low(osp) &&
227             osp_sync_rpcs_in_flight_low(osp) && osp->opd_imp_connected)
228                 return 1;
229
230         /* has remotely committed? */
231         if (!list_empty(&osp->opd_sync_committed_there))
232                 return 1;
233
234         return 0;
235 }
236
237 void osp_sync_check_for_work(struct osp_device *osp)
238 {
239         if (osp_sync_has_work(osp))
240                 wake_up(&osp->opd_sync_waitq);
241 }
242
243 static inline __u64 osp_sync_correct_id(struct osp_device *d,
244                                         struct llog_rec_hdr *rec)
245 {
246         /*
247          * llog use cyclic store with 32 bit lrh_id
248          * so overflow lrh_id is possible. Range between
249          * last_processed and last_committed is less than
250          * 64745 ^ 2 and less than 2^32 - 1
251          */
252         __u64 correct_id = d->opd_sync_last_committed_id;
253
254         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
255                 correct_id -= 0x100000000ULL;
256
257         correct_id &= ~0xffffffffULL;
258         correct_id |= rec->lrh_id;
259
260         return correct_id;
261 }
262 /**
263  * Check and return ready-for-new status.
264  *
265  * The thread processing llog record uses this function to check whether
266  * it's time to take another record and process it. The number of conditions
267  * must be met: the connection should be ready, RPCs in flight not exceeding
268  * the limit, the record is committed locally, etc (see the lines below).
269  *
270  * \param[in] d         OSP device
271  * \param[in] rec       next llog record to process
272  *
273  * \retval 0            not ready
274  * \retval 1            ready
275  */
276 static inline int osp_sync_can_process_new(struct osp_device *d,
277                                            struct llog_rec_hdr *rec)
278 {
279         LASSERT(d);
280
281         if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
282                 return 0;
283         if (unlikely(osp_sync_in_flight_conflict(d, rec)))
284                 return 0;
285         if (!osp_sync_rpcs_in_progress_low(d))
286                 return 0;
287         if (!osp_sync_rpcs_in_flight_low(d))
288                 return 0;
289         if (!d->opd_imp_connected)
290                 return 0;
291         if (d->opd_sync_prev_done == 0)
292                 return 1;
293         if (atomic_read(&d->opd_sync_changes) == 0)
294                 return 0;
295         if (rec == NULL ||
296             osp_sync_correct_id(d, rec) <= d->opd_sync_last_committed_id)
297                 return 1;
298         return 0;
299 }
300
301 /**
302  * Declare intention to add a new change.
303  *
304  * With regard to OSD API, we have to declare any changes ahead. In this
305  * case we declare an intention to add a llog record representing the
306  * change on the local storage.
307  *
308  * \param[in] env       LU environment provided by the caller
309  * \param[in] o         OSP object
310  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
311  * \param[in] th        transaction handle (local)
312  *
313  * \retval 0            on success
314  * \retval negative     negated errno on error
315  */
316 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
317                          llog_op_type type, struct thandle *th)
318 {
319         struct osp_thread_info  *osi = osp_env_info(env);
320         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
321         struct llog_ctxt        *ctxt;
322         struct thandle          *storage_th;
323         int                      rc;
324
325         ENTRY;
326
327         /* it's a layering violation, to access internals of th,
328          * but we can do this as a sanity check, for a while */
329         LASSERT(th->th_top != NULL);
330         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
331         if (IS_ERR(storage_th))
332                 RETURN(PTR_ERR(storage_th));
333
334         switch (type) {
335         case MDS_UNLINK64_REC:
336                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
337                 break;
338         case MDS_SETATTR64_REC:
339                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec_v2);
340                 break;
341         default:
342                 LBUG();
343         }
344
345         /* we want ->dt_trans_start() to allocate per-thandle structure */
346         storage_th->th_tags |= LCT_OSP_THREAD;
347
348         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
349         LASSERT(ctxt);
350
351         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
352                               storage_th);
353         llog_ctxt_put(ctxt);
354
355         RETURN(rc);
356 }
357
358 /**
359  * Generate a llog record for a given change.
360  *
361  * Generates a llog record for the change passed. The change can be of two
362  * types: unlink and setattr. The record gets an ID which later will be
363  * used to track commit status of the change. For unlink changes, the caller
364  * can supply a starting FID and the count of the objects to destroy. For
365  * setattr the caller should apply attributes to apply.
366  *
367  *
368  * \param[in] env       LU environment provided by the caller
369  * \param[in] d         OSP device
370  * \param[in] fid       fid of the object the change should be applied to
371  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
372  * \param[in] count     count of objects to destroy
373  * \param[in] th        transaction handle (local)
374  * \param[in] attr      attributes for setattr
375  *
376  * \retval 0            on success
377  * \retval negative     negated errno on error
378  */
379 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
380                             const struct lu_fid *fid, llog_op_type type,
381                             int count, struct thandle *th,
382                             const struct lu_attr *attr)
383 {
384         struct osp_thread_info  *osi = osp_env_info(env);
385         struct llog_ctxt        *ctxt;
386         struct osp_txn_info     *txn;
387         struct thandle          *storage_th;
388         int                      rc;
389
390         ENTRY;
391
392         /* it's a layering violation, to access internals of th,
393          * but we can do this as a sanity check, for a while */
394         LASSERT(th->th_top != NULL);
395         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
396         if (IS_ERR(storage_th))
397                 RETURN(PTR_ERR(storage_th));
398
399         switch (type) {
400         case MDS_UNLINK64_REC:
401                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
402                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
403                 osi->osi_unlink.lur_fid  = *fid;
404                 osi->osi_unlink.lur_count = count;
405                 break;
406         case MDS_SETATTR64_REC:
407                 rc = fid_to_ostid(fid, &osi->osi_oi);
408                 LASSERT(rc == 0);
409                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
410                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
411                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
412                 LASSERT(attr);
413                 osi->osi_setattr.lsr_uid = attr->la_uid;
414                 osi->osi_setattr.lsr_gid = attr->la_gid;
415                 osi->osi_setattr.lsr_projid = attr->la_projid;
416                 osi->osi_setattr.lsr_valid =
417                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
418                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
419                         ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
420                 break;
421         default:
422                 LBUG();
423         }
424
425         txn = osp_txn_info(&storage_th->th_ctx);
426         LASSERT(txn);
427
428         txn->oti_current_id = osp_sync_id_get(d, txn->oti_current_id);
429         osi->osi_hdr.lrh_id = (txn->oti_current_id & 0xffffffffULL);
430         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
431         if (ctxt == NULL)
432                 RETURN(-ENOMEM);
433
434         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
435                       storage_th);
436         llog_ctxt_put(ctxt);
437
438         if (likely(rc >= 0)) {
439                 CDEBUG(D_OTHER, "%s: new record "DFID":%x.%u: rc = %d\n",
440                        d->opd_obd->obd_name,
441                        PFID(&osi->osi_cookie.lgc_lgl.lgl_oi.oi_fid),
442                        osi->osi_cookie.lgc_lgl.lgl_ogen,
443                        osi->osi_cookie.lgc_index, rc);
444                 atomic_inc(&d->opd_sync_changes);
445         }
446         /* return 0 always here, error case just cause no llog record */
447         RETURN(0);
448 }
449
450 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
451                  llog_op_type type, struct thandle *th,
452                  const struct lu_attr *attr)
453 {
454         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
455                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
456                                 th, attr);
457 }
458
459 int osp_sync_gap(const struct lu_env *env, struct osp_device *d,
460                         struct lu_fid *fid, int lost, struct thandle *th)
461 {
462         return osp_sync_add_rec(env, d, fid, MDS_UNLINK64_REC, lost, th, NULL);
463 }
464
465 /*
466  * it's quite obvious we can't maintain all the structures in the memory:
467  * while OST is down, MDS can be processing thousands and thousands of unlinks
468  * filling persistent llogs and in-core respresentation
469  *
470  * this doesn't scale at all. so we need basically the following:
471  * a) destroy/setattr append llog records
472  * b) once llog has grown to X records, we process first Y committed records
473  *
474  *  once record R is found via llog_process(), it becomes committed after any
475  *  subsequent commit callback (at the most)
476  */
477
478 /**
479  * ptlrpc commit callback.
480  *
481  * The callback is called by PTLRPC when a RPC is reported committed by the
482  * target (OST). We register the callback for the every RPC applying a change
483  * from the llog. This way we know then the llog records can be cancelled.
484  * Notice the callback can be called when OSP is finishing. We can detect this
485  * checking that actual transno in the request is less or equal of known
486  * committed transno (see osp_sync_process_committed() for the details).
487  * XXX: this is pretty expensive and can be improved later using batching.
488  *
489  * \param[in] req       request
490  */
491 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
492 {
493         struct osp_device *d = req->rq_cb_data;
494         struct osp_job_req_args *jra;
495
496         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
497
498         if (unlikely(req->rq_transno == 0))
499                 return;
500
501         /* do not do any opd_sync_rpcs_* accounting here
502          * it's done in osp_sync_interpret sooner or later */
503         LASSERT(d);
504
505         jra = ptlrpc_req_async_args(req);
506         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
507         LASSERT(list_empty(&jra->jra_committed_link));
508
509         ptlrpc_request_addref(req);
510
511         spin_lock(&d->opd_sync_lock);
512         list_add(&jra->jra_committed_link, &d->opd_sync_committed_there);
513         spin_unlock(&d->opd_sync_lock);
514
515         /* XXX: some batching wouldn't hurt */
516         wake_up(&d->opd_sync_waitq);
517 }
518
519 /**
520  * RPC interpretation callback.
521  *
522  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
523  * whether we should:
524  *  - put request on a special list to wait until it's committed by the target,
525  *    if the request is successful
526  *  - schedule llog record cancel if no target object is found
527  *  - try later (essentially after reboot) in case of unexpected error
528  *
529  * \param[in] env       LU environment provided by the caller
530  * \param[in] req       request replied
531  * \param[in] aa        callback data
532  * \param[in] rc        result of RPC
533  *
534  * \retval 0            always
535  */
536 static int osp_sync_interpret(const struct lu_env *env,
537                               struct ptlrpc_request *req, void *aa, int rc)
538 {
539         struct osp_device *d = req->rq_cb_data;
540         struct osp_job_req_args *jra = aa;
541
542         if (jra->jra_magic != OSP_JOB_MAGIC) {
543                 DEBUG_REQ(D_ERROR, req, "bad magic %u\n", jra->jra_magic);
544                 LBUG();
545         }
546         LASSERT(d);
547
548         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
549                atomic_read(&req->rq_refcount),
550                rc, (unsigned) req->rq_transno);
551         LASSERT(rc || req->rq_transno);
552
553         if (rc == -ENOENT) {
554                 /*
555                  * we tried to destroy object or update attributes,
556                  * but object doesn't exist anymore - cancell llog record
557                  */
558                 LASSERT(req->rq_transno == 0);
559                 LASSERT(list_empty(&jra->jra_committed_link));
560
561                 ptlrpc_request_addref(req);
562
563                 spin_lock(&d->opd_sync_lock);
564                 list_add(&jra->jra_committed_link,
565                          &d->opd_sync_committed_there);
566                 spin_unlock(&d->opd_sync_lock);
567
568                 wake_up(&d->opd_sync_waitq);
569         } else if (rc) {
570                 struct obd_import *imp = req->rq_import;
571                 /*
572                  * error happened, we'll try to repeat on next boot ?
573                  */
574                 LASSERTF(req->rq_transno == 0 ||
575                          req->rq_import_generation < imp->imp_generation,
576                          "transno %llu, rc %d, gen: req %d, imp %d\n",
577                          req->rq_transno, rc, req->rq_import_generation,
578                          imp->imp_generation);
579                 if (req->rq_transno == 0) {
580                         /* this is the last time we see the request
581                          * if transno is not zero, then commit cb
582                          * will be called at some point */
583                         LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) > 0);
584                         atomic_dec(&d->opd_sync_rpcs_in_progress);
585                 }
586
587                 wake_up(&d->opd_sync_waitq);
588         } else if (d->opd_pre != NULL &&
589                    unlikely(d->opd_pre_status == -ENOSPC)) {
590                 /*
591                  * if current status is -ENOSPC (lack of free space on OST)
592                  * then we should poll OST immediately once object destroy
593                  * is replied
594                  */
595                 osp_statfs_need_now(d);
596         }
597
598         spin_lock(&d->opd_sync_lock);
599         list_del_init(&jra->jra_in_flight_link);
600         spin_unlock(&d->opd_sync_lock);
601         LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) > 0);
602         atomic_dec(&d->opd_sync_rpcs_in_flight);
603         if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
604                 wake_up(&d->opd_sync_barrier_waitq);
605         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
606                d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
607                atomic_read(&d->opd_sync_rpcs_in_progress));
608
609         osp_sync_check_for_work(d);
610
611         return 0;
612 }
613
614 /*
615  ** Add request to ptlrpc queue.
616  *
617  * This is just a tiny helper function to put the request on the sending list
618  *
619  * \param[in] d         OSP device
620  * \param[in] llh       llog handle where the record is stored
621  * \param[in] h         llog record
622  * \param[in] req       request
623  */
624 static void osp_sync_send_new_rpc(struct osp_device *d,
625                                   struct llog_handle *llh,
626                                   struct llog_rec_hdr *h,
627                                   struct ptlrpc_request *req)
628 {
629         struct osp_job_req_args *jra;
630
631         LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
632                 d->opd_sync_max_rpcs_in_flight);
633
634         jra = ptlrpc_req_async_args(req);
635         jra->jra_magic = OSP_JOB_MAGIC;
636         jra->jra_lcookie.lgc_lgl = llh->lgh_id;
637         jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
638         jra->jra_lcookie.lgc_index = h->lrh_index;
639         INIT_LIST_HEAD(&jra->jra_committed_link);
640         spin_lock(&d->opd_sync_lock);
641         list_add_tail(&jra->jra_in_flight_link, &d->opd_sync_in_flight_list);
642         spin_unlock(&d->opd_sync_lock);
643
644         ptlrpcd_add_req(req);
645 }
646
647
648 /**
649  * Allocate and prepare RPC for a new change.
650  *
651  * The function allocates and initializes an RPC which will be sent soon to
652  * apply the change to the target OST. The request is initialized from the
653  * llog record passed. Notice only the fields common to all type of changes
654  * are initialized.
655  *
656  * \param[in] d         OSP device
657  * \param[in] op        type of the change
658  * \param[in] format    request format to be used
659  *
660  * \retval pointer              new request on success
661  * \retval ERR_PTR(errno)       on error
662  */
663 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
664                                                ost_cmd_t op,
665                                                const struct req_format *format)
666 {
667         struct ptlrpc_request   *req;
668         struct obd_import       *imp;
669         int                      rc;
670
671         /* Prepare the request */
672         imp = d->opd_obd->u.cli.cl_import;
673         LASSERT(imp);
674
675         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
676                 RETURN(ERR_PTR(-ENOMEM));
677
678         req = ptlrpc_request_alloc(imp, format);
679         if (req == NULL)
680                 RETURN(ERR_PTR(-ENOMEM));
681
682         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
683         if (rc) {
684                 ptlrpc_req_finished(req);
685                 return ERR_PTR(rc);
686         }
687
688         req->rq_interpret_reply = osp_sync_interpret;
689         req->rq_commit_cb = osp_sync_request_commit_cb;
690         req->rq_cb_data = d;
691
692         ptlrpc_request_set_replen(req);
693
694         return req;
695 }
696
697 /**
698  * Generate a request for setattr change.
699  *
700  * The function prepares a new RPC, initializes it with setattr specific
701  * bits and send the RPC.
702  *
703  * \param[in] d         OSP device
704  * \param[in] llh       llog handle where the record is stored
705  * \param[in] h         llog record
706  *
707  * \retval 0            on success
708  * \retval 1            on invalid record
709  * \retval negative     negated errno on error
710  */
711 static int osp_sync_new_setattr_job(struct osp_device *d,
712                                     struct llog_handle *llh,
713                                     struct llog_rec_hdr *h)
714 {
715         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
716         struct ptlrpc_request           *req;
717         struct ost_body                 *body;
718
719         ENTRY;
720         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
721
722         if (OBD_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
723                 RETURN(1);
724
725         /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
726          * so no bits other than these should be set. */
727         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
728             OBD_MD_FLPROJID)) != 0) {
729                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
730                         d->opd_obd->obd_name, rec->lsr_valid);
731                 /* return 1 on invalid record */
732                 RETURN(1);
733         }
734
735         req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
736         if (IS_ERR(req))
737                 RETURN(PTR_ERR(req));
738
739         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
740         LASSERT(body);
741         body->oa.o_oi = rec->lsr_oi;
742         body->oa.o_uid = rec->lsr_uid;
743         body->oa.o_gid = rec->lsr_gid;
744         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
745         if (h->lrh_len > sizeof(struct llog_setattr64_rec))
746                 body->oa.o_projid = ((struct llog_setattr64_rec_v2 *)
747                                       rec)->lsr_projid;
748
749         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
750          * we assume that both UID and GID are valid in that case. */
751         if (rec->lsr_valid == 0)
752                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
753         else
754                 body->oa.o_valid |= rec->lsr_valid;
755
756         osp_sync_send_new_rpc(d, llh, h, req);
757         RETURN(0);
758 }
759
760 /**
761  * Generate a request for unlink change.
762  *
763  * The function prepares a new RPC, initializes it with unlink(destroy)
764  * specific bits and sends the RPC. The function is used to handle
765  * llog_unlink_rec which were used in the older versions of Lustre.
766  * Current version uses llog_unlink_rec64.
767  *
768  * \param[in] d         OSP device
769  * \param[in] llh       llog handle where the record is stored
770  * \param[in] h         llog record
771  *
772  * \retval 0            on success
773  * \retval negative     negated errno on error
774  */
775 static int osp_sync_new_unlink_job(struct osp_device *d,
776                                    struct llog_handle *llh,
777                                    struct llog_rec_hdr *h)
778 {
779         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
780         struct ptlrpc_request   *req;
781         struct ost_body         *body;
782         int rc;
783
784         ENTRY;
785         LASSERT(h->lrh_type == MDS_UNLINK_REC);
786
787         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
788         if (IS_ERR(req))
789                 RETURN(PTR_ERR(req));
790
791         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
792         LASSERT(body);
793         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
794         rc = ostid_set_id(&body->oa.o_oi, rec->lur_oid);
795         if (rc)
796                 return rc;
797         body->oa.o_misc = rec->lur_count;
798         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
799         if (rec->lur_count)
800                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
801
802         osp_sync_send_new_rpc(d, llh, h, req);
803         RETURN(0);
804 }
805
806 /**
807  * Generate a request for unlink change.
808  *
809  * The function prepares a new RPC, initializes it with unlink(destroy)
810  * specific bits and sends the RPC. Depending on the target (MDT or OST)
811  * two different protocols are used. For MDT we use OUT (basically OSD API
812  * updates transferred via a network). For OST we still use the old
813  * protocol (OBD?), originally for compatibility. Later we can start to
814  * use OUT for OST as well, this will allow batching and better code
815  * unification.
816  *
817  * \param[in] d         OSP device
818  * \param[in] llh       llog handle where the record is stored
819  * \param[in] h         llog record
820  *
821  * \retval 0            on success
822  * \retval negative     negated errno on error
823  */
824 static int osp_sync_new_unlink64_job(struct osp_device *d,
825                                      struct llog_handle *llh,
826                                      struct llog_rec_hdr *h)
827 {
828         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
829         struct ptlrpc_request           *req = NULL;
830         struct ost_body                 *body;
831         int                              rc;
832
833         ENTRY;
834         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
835         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
836         if (IS_ERR(req))
837                 RETURN(PTR_ERR(req));
838
839         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
840         if (body == NULL)
841                 RETURN(-EFAULT);
842         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
843         if (rc < 0)
844                 RETURN(rc);
845         body->oa.o_misc = rec->lur_count;
846         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
847                            OBD_MD_FLOBJCOUNT;
848         osp_sync_send_new_rpc(d, llh, h, req);
849         RETURN(0);
850 }
851
852 /**
853  * Process llog records.
854  *
855  * This function is called to process the llog records committed locally.
856  * In the recovery model used by OSP we can apply a change to a remote
857  * target once corresponding transaction (like posix unlink) is committed
858  * locally so can't revert.
859  * Depending on the llog record type, a given handler is called that is
860  * responsible for preparing and sending the RPC to apply the change.
861  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
862  *
863  * \param[in] env       LU environment provided by the caller
864  * \param[in] d         OSP device
865  * \param[in] llh       llog handle where the record is stored
866  * \param[in] rec       llog record
867  */
868 static void osp_sync_process_record(const struct lu_env *env,
869                                     struct osp_device *d,
870                                     struct llog_handle *llh,
871                                     struct llog_rec_hdr *rec)
872 {
873         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
874         struct llog_cookie       cookie;
875         int                      rc = 0;
876
877         ENTRY;
878
879         cookie.lgc_lgl = llh->lgh_id;
880         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
881         cookie.lgc_index = rec->lrh_index;
882
883         d->opd_sync_last_catalog_idx = llh->lgh_hdr->llh_cat_idx;
884
885         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
886                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
887
888                 /* we're waiting for the record generated by this instance */
889                 LASSERT(d->opd_sync_prev_done == 0);
890                 if (!memcmp(&d->opd_sync_generation, &gen->lgr_gen,
891                             sizeof(gen->lgr_gen))) {
892                         CDEBUG(D_HA, "processed all old entries\n");
893                         d->opd_sync_prev_done = 1;
894                 }
895
896                 /* cancel any generation record */
897                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
898
899                 RETURN_EXIT;
900         }
901
902         /*
903          * now we prepare and fill requests to OST, put them on the queue
904          * and fire after next commit callback
905          */
906
907         /* notice we increment counters before sending RPC, to be consistent
908          * in RPC interpret callback which may happen very quickly */
909         atomic_inc(&d->opd_sync_rpcs_in_flight);
910         atomic_inc(&d->opd_sync_rpcs_in_progress);
911
912         switch (rec->lrh_type) {
913         /* case MDS_UNLINK_REC is kept for compatibility */
914         case MDS_UNLINK_REC:
915                 rc = osp_sync_new_unlink_job(d, llh, rec);
916                 break;
917         case MDS_UNLINK64_REC:
918                 rc = osp_sync_new_unlink64_job(d, llh, rec);
919                 break;
920         case MDS_SETATTR64_REC:
921                 rc = osp_sync_new_setattr_job(d, llh, rec);
922                 break;
923         default:
924                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
925                        rec->lrh_type);
926                 /* treat "unknown record type" as "invalid" */
927                 rc = 1;
928                 break;
929         }
930
931         /* For all kinds of records, not matter successful or not,
932          * we should decrease changes and bump last_processed_id.
933          */
934         if (d->opd_sync_prev_done) {
935                 __u64 correct_id = osp_sync_correct_id(d, rec);
936                 LASSERT(atomic_read(&d->opd_sync_changes) > 0);
937                 LASSERT(correct_id <= d->opd_sync_last_committed_id);
938                 /* NOTE: it's possible to meet same id if
939                  * OST stores few stripes of same file
940                  */
941                 while (1) {
942                         /* another thread may be trying to set new value */
943                         rmb();
944                         if (correct_id > d->opd_sync_last_processed_id) {
945                                 d->opd_sync_last_processed_id = correct_id;
946                                 wake_up(&d->opd_sync_barrier_waitq);
947                         } else
948                                 break;
949                 }
950                 atomic_dec(&d->opd_sync_changes);
951         }
952         if (rc != 0) {
953                 atomic_dec(&d->opd_sync_rpcs_in_flight);
954                 atomic_dec(&d->opd_sync_rpcs_in_progress);
955         }
956
957         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
958                d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
959                atomic_read(&d->opd_sync_rpcs_in_progress));
960
961         /* Delete the invalid record */
962         if (rc == 1) {
963                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
964                 if (rc != 0)
965                         CERROR("%s: can't delete invalid record: "
966                                "fid = "DFID", rec_id = %u, rc = %d\n",
967                                d->opd_obd->obd_name,
968                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
969                                rec->lrh_id, rc);
970         }
971
972         CDEBUG(D_HA, "found record %x, %d, idx %u, id %u\n",
973                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
974
975         RETURN_EXIT;
976 }
977
978 /**
979  * Cancel llog records for the committed changes.
980  *
981  * The function walks through the list of the committed RPCs and cancels
982  * corresponding llog records. see osp_sync_request_commit_cb() for the
983  * details.
984  *
985  * \param[in] env       LU environment provided by the caller
986  * \param[in] d         OSP device
987  */
988 static void osp_sync_process_committed(const struct lu_env *env,
989                                        struct osp_device *d)
990 {
991         struct obd_device       *obd = d->opd_obd;
992         struct obd_import       *imp = obd->u.cli.cl_import;
993         struct ost_body         *body;
994         struct ptlrpc_request   *req;
995         struct llog_ctxt        *ctxt;
996         struct llog_handle      *llh;
997         struct list_head         list;
998         int                      rc, done = 0;
999
1000         ENTRY;
1001
1002         if (list_empty(&d->opd_sync_committed_there))
1003                 return;
1004
1005         /*
1006          * if current status is -ENOSPC (lack of free space on OST)
1007          * then we should poll OST immediately once object destroy
1008          * is committed.
1009          * notice: we do this upon commit as well because some backends
1010          * (like DMU) do not release space right away.
1011          */
1012         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1013                 osp_statfs_need_now(d);
1014
1015         /*
1016          * now cancel them all
1017          * XXX: can we improve this using some batching?
1018          *      with batch RPC that'll happen automatically?
1019          * XXX: can we store ctxt in lod_device and save few cycles ?
1020          */
1021         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1022         LASSERT(ctxt);
1023
1024         llh = ctxt->loc_handle;
1025         LASSERT(llh);
1026
1027         INIT_LIST_HEAD(&list);
1028         spin_lock(&d->opd_sync_lock);
1029         list_splice(&d->opd_sync_committed_there, &list);
1030         INIT_LIST_HEAD(&d->opd_sync_committed_there);
1031         spin_unlock(&d->opd_sync_lock);
1032
1033         while (!list_empty(&list)) {
1034                 struct osp_job_req_args *jra;
1035
1036                 jra = list_entry(list.next, struct osp_job_req_args,
1037                                  jra_committed_link);
1038                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1039                 list_del_init(&jra->jra_committed_link);
1040
1041                 req = container_of((void *)jra, struct ptlrpc_request,
1042                                    rq_async_args);
1043                 body = req_capsule_client_get(&req->rq_pill,
1044                                               &RMF_OST_BODY);
1045                 LASSERT(body);
1046                 /* import can be closing, thus all commit cb's are
1047                  * called we can check committness directly */
1048                 if (req->rq_import_generation == imp->imp_generation) {
1049                         rc = llog_cat_cancel_records(env, llh, 1,
1050                                                      &jra->jra_lcookie);
1051                         if (rc)
1052                                 CERROR("%s: can't cancel record: %d\n",
1053                                        obd->obd_name, rc);
1054                 } else {
1055                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1056                                   imp->imp_peer_committed_transno);
1057                 }
1058                 ptlrpc_req_finished(req);
1059                 done++;
1060         }
1061
1062         llog_ctxt_put(ctxt);
1063
1064         LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
1065         atomic_sub(done, &d->opd_sync_rpcs_in_progress);
1066         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
1067                d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
1068                atomic_read(&d->opd_sync_rpcs_in_progress));
1069
1070         osp_sync_check_for_work(d);
1071
1072         /* wake up the thread if requested to stop:
1073          * it might be waiting for in-progress to complete */
1074         if (unlikely(osp_sync_running(d) == 0))
1075                 wake_up(&d->opd_sync_waitq);
1076
1077         EXIT;
1078 }
1079
1080 /**
1081  * The core of the syncing mechanism.
1082  *
1083  * This is a callback called by the llog processing function. Essentially it
1084  * suspends llog processing until there is a record to process (it's supposed
1085  * to be committed locally). The function handles RPCs committed by the target
1086  * and cancels corresponding llog records.
1087  *
1088  * \param[in] env       LU environment provided by the caller
1089  * \param[in] llh       llog handle we're processing
1090  * \param[in] rec       current llog record
1091  * \param[in] data      callback data containing a pointer to the device
1092  *
1093  * \retval 0                    to ask the caller (llog_process()) to continue
1094  * \retval LLOG_PROC_BREAK      to ask the caller to break
1095  */
1096 static int osp_sync_process_queues(const struct lu_env *env,
1097                                    struct llog_handle *llh,
1098                                    struct llog_rec_hdr *rec,
1099                                    void *data)
1100 {
1101         struct osp_device       *d = data;
1102
1103         do {
1104                 struct l_wait_info lwi = { 0 };
1105
1106                 if (!osp_sync_running(d)) {
1107                         CDEBUG(D_HA, "stop llog processing\n");
1108                         return LLOG_PROC_BREAK;
1109                 }
1110
1111                 /* process requests committed by OST */
1112                 osp_sync_process_committed(env, d);
1113
1114                 /* if we there are changes to be processed and we have
1115                  * resources for this ... do now */
1116                 if (osp_sync_can_process_new(d, rec)) {
1117                         if (llh == NULL) {
1118                                 /* ask llog for another record */
1119                                 CDEBUG(D_HA, "%u changes, %u in progress,"
1120                                      " %u in flight\n",
1121                                      atomic_read(&d->opd_sync_changes),
1122                                      atomic_read(&d->opd_sync_rpcs_in_progress),
1123                                      atomic_read(&d->opd_sync_rpcs_in_flight));
1124                                 return 0;
1125                         }
1126                         osp_sync_process_record(env, d, llh, rec);
1127                         llh = NULL;
1128                         rec = NULL;
1129                 }
1130
1131                 if (d->opd_sync_last_processed_id == d->opd_sync_last_used_id)
1132                         osp_sync_remove_from_tracker(d);
1133
1134                 l_wait_event(d->opd_sync_waitq,
1135                              !osp_sync_running(d) ||
1136                              osp_sync_can_process_new(d, rec) ||
1137                              !list_empty(&d->opd_sync_committed_there),
1138                              &lwi);
1139         } while (1);
1140 }
1141
1142 /**
1143  * OSP sync thread.
1144  *
1145  * This thread runs llog_cat_process() scanner calling our callback
1146  * to process llog records. in the callback we implement tricky
1147  * state machine as we don't want to start scanning of the llog again
1148  * and again, also we don't want to process too many records and send
1149  * too many RPCs a time. so, depending on current load (num of changes
1150  * being synced to OST) the callback can suspend awaiting for some
1151  * new conditions, like syncs completed.
1152  *
1153  * In order to process llog records left by previous boots and to allow
1154  * llog_process_thread() to find something (otherwise it'd just exit
1155  * immediately) we add a special GENERATATION record on each boot.
1156  *
1157  * \param[in] _arg      a pointer to thread's arguments
1158  *
1159  * \retval 0            on success
1160  * \retval negative     negated errno on error
1161  */
1162 static int osp_sync_thread(void *_arg)
1163 {
1164         struct osp_device       *d = _arg;
1165         struct ptlrpc_thread    *thread = &d->opd_sync_thread;
1166         struct l_wait_info       lwi = { 0 };
1167         struct llog_ctxt        *ctxt;
1168         struct obd_device       *obd = d->opd_obd;
1169         struct llog_handle      *llh;
1170         struct lu_env            env;
1171         int                      rc, count;
1172         bool                     wrapped;
1173
1174         ENTRY;
1175
1176         rc = lu_env_init(&env, LCT_LOCAL);
1177         if (rc) {
1178                 CERROR("%s: can't initialize env: rc = %d\n",
1179                        obd->obd_name, rc);
1180
1181                 spin_lock(&d->opd_sync_lock);
1182                 thread->t_flags = SVC_STOPPED;
1183                 spin_unlock(&d->opd_sync_lock);
1184                 wake_up(&thread->t_ctl_waitq);
1185
1186                 RETURN(rc);
1187         }
1188
1189         spin_lock(&d->opd_sync_lock);
1190         thread->t_flags = SVC_RUNNING;
1191         spin_unlock(&d->opd_sync_lock);
1192         wake_up(&thread->t_ctl_waitq);
1193
1194         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1195         if (ctxt == NULL) {
1196                 CERROR("can't get appropriate context\n");
1197                 GOTO(out, rc = -EINVAL);
1198         }
1199
1200         llh = ctxt->loc_handle;
1201         if (llh == NULL) {
1202                 CERROR("can't get llh\n");
1203                 llog_ctxt_put(ctxt);
1204                 GOTO(out, rc = -EINVAL);
1205         }
1206
1207         /*
1208          * Catalog processing stops when it processed last catalog record
1209          * with index equal to the end of catalog bitmap. Or if it is wrapped,
1210          * processing stops with index equal to the lgh_last_idx. We need to
1211          * continue processing.
1212          */
1213         d->opd_sync_last_catalog_idx = 0;
1214         do {
1215                 int     size;
1216
1217                 wrapped = (llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
1218                            llh->lgh_hdr->llh_count > 1);
1219
1220                 rc = llog_cat_process(&env, llh, osp_sync_process_queues, d,
1221                                       d->opd_sync_last_catalog_idx, 0);
1222
1223                 size = OBD_FAIL_PRECHECK(OBD_FAIL_CAT_RECORDS) ?
1224                        cfs_fail_val : (LLOG_HDR_BITMAP_SIZE(llh->lgh_hdr) - 1);
1225                 /* processing reaches catalog bottom */
1226                 if (d->opd_sync_last_catalog_idx == size)
1227                         d->opd_sync_last_catalog_idx = 0;
1228                 else if (wrapped)
1229                         /* If catalog is wrapped we can`t predict last index of
1230                          * processing because lgh_last_idx could be changed.
1231                          * Starting form the next one */
1232                         d->opd_sync_last_catalog_idx++;
1233
1234         } while (rc == 0 && (wrapped || d->opd_sync_last_catalog_idx == 0));
1235
1236         if (rc < 0) {
1237                 CERROR("%s: llog process with osp_sync_process_queues "
1238                        "failed: %d\n", d->opd_obd->obd_name, rc);
1239                 GOTO(close, rc);
1240         }
1241         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1242                  "%u changes, %u in progress, %u in flight: %d\n",
1243                  atomic_read(&d->opd_sync_changes),
1244                  atomic_read(&d->opd_sync_rpcs_in_progress),
1245                  atomic_read(&d->opd_sync_rpcs_in_flight), rc);
1246
1247         /* we don't expect llog_process_thread() to exit till umount */
1248         LASSERTF(thread->t_flags != SVC_RUNNING,
1249                  "%u changes, %u in progress, %u in flight\n",
1250                  atomic_read(&d->opd_sync_changes),
1251                  atomic_read(&d->opd_sync_rpcs_in_progress),
1252                  atomic_read(&d->opd_sync_rpcs_in_flight));
1253
1254         /* wait till all the requests are completed */
1255         count = 0;
1256         while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
1257                 osp_sync_process_committed(&env, d);
1258
1259                 lwi = LWI_TIMEOUT(cfs_time_seconds(5), NULL, NULL);
1260                 rc = l_wait_event(d->opd_sync_waitq,
1261                                 atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
1262                                   &lwi);
1263                 if (rc == -ETIMEDOUT)
1264                         count++;
1265                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1266                          d->opd_obd->obd_name,
1267                          atomic_read(&d->opd_sync_rpcs_in_progress),
1268                          atomic_read(&d->opd_sync_rpcs_in_flight),
1269                          list_empty(&d->opd_sync_committed_there) ? "" : "!");
1270
1271         }
1272
1273 close:
1274         llog_cat_close(&env, llh);
1275         rc = llog_cleanup(&env, ctxt);
1276         if (rc)
1277                 CERROR("can't cleanup llog: %d\n", rc);
1278 out:
1279         LASSERTF(atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
1280                  "%s: %d %d %sempty\n", d->opd_obd->obd_name,
1281                  atomic_read(&d->opd_sync_rpcs_in_progress),
1282                  atomic_read(&d->opd_sync_rpcs_in_flight),
1283                  list_empty(&d->opd_sync_committed_there) ? "" : "!");
1284
1285         thread->t_flags = SVC_STOPPED;
1286
1287         wake_up(&thread->t_ctl_waitq);
1288
1289         lu_env_fini(&env);
1290
1291         RETURN(0);
1292 }
1293
1294 /**
1295  * Initialize llog.
1296  *
1297  * Initializes the llog. Specific llog to be used depends on the type of the
1298  * target OSP represents (OST or MDT). The function adds appends a new llog
1299  * record to mark the place where the records associated with this boot
1300  * start.
1301  *
1302  * \param[in] env       LU environment provided by the caller
1303  * \param[in] d         OSP device
1304  *
1305  * \retval 0            on success
1306  * \retval negative     negated errno on error
1307  */
1308 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1309 {
1310         struct osp_thread_info  *osi = osp_env_info(env);
1311         struct lu_fid           *fid = &osi->osi_fid;
1312         struct llog_handle      *lgh = NULL;
1313         struct obd_device       *obd = d->opd_obd;
1314         struct llog_ctxt        *ctxt;
1315         int                     rc;
1316
1317         ENTRY;
1318
1319         LASSERT(obd);
1320
1321         /*
1322          * open llog corresponding to our OST
1323          */
1324         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1325         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1326
1327         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1328
1329         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1330                                    &osi->osi_cid, fid);
1331         if (rc < 0) {
1332                 if (rc != -EFAULT) {
1333                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1334                                obd->obd_name, rc);
1335                         RETURN(rc);
1336                 }
1337
1338                 /* After sparse OST indices is supported, the CATALOG file
1339                  * may become a sparse file that results in failure on
1340                  * reading. Skip this error as the llog will be created
1341                  * later */
1342                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1343                 rc = 0;
1344         }
1345
1346         CDEBUG(D_INFO, "%s: Init llog for %d - catid "DFID":%x\n",
1347                obd->obd_name, d->opd_index,
1348                PFID(&osi->osi_cid.lci_logid.lgl_oi.oi_fid),
1349                osi->osi_cid.lci_logid.lgl_ogen);
1350
1351         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1352                         d->opd_storage->dd_lu_dev.ld_obd,
1353                         &osp_mds_ost_orig_logops);
1354         if (rc)
1355                 RETURN(rc);
1356
1357         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1358         LASSERT(ctxt);
1359
1360         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1361                 rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid, NULL,
1362                                LLOG_OPEN_EXISTS);
1363                 /* re-create llog if it is missing */
1364                 if (rc == -ENOENT)
1365                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1366                 else if (rc < 0)
1367                         GOTO(out_cleanup, rc);
1368         }
1369
1370         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1371                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1372                 if (rc < 0)
1373                         GOTO(out_cleanup, rc);
1374                 osi->osi_cid.lci_logid = lgh->lgh_id;
1375         }
1376
1377         LASSERT(lgh != NULL);
1378         ctxt->loc_handle = lgh;
1379
1380         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT, NULL);
1381         if (rc)
1382                 GOTO(out_close, rc);
1383
1384         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1385                                    &osi->osi_cid, fid);
1386         if (rc)
1387                 GOTO(out_close, rc);
1388
1389         /*
1390          * put a mark in the llog till which we'll be processing
1391          * old records restless
1392          */
1393         d->opd_sync_generation.mnt_cnt = cfs_time_current();
1394         d->opd_sync_generation.conn_cnt = cfs_time_current();
1395
1396         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1397         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1398
1399         memcpy(&osi->osi_gen.lgr_gen, &d->opd_sync_generation,
1400                sizeof(osi->osi_gen.lgr_gen));
1401
1402         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1403         if (rc < 0)
1404                 GOTO(out_close, rc);
1405         llog_ctxt_put(ctxt);
1406         RETURN(0);
1407 out_close:
1408         llog_cat_close(env, lgh);
1409 out_cleanup:
1410         llog_cleanup(env, ctxt);
1411         RETURN(rc);
1412 }
1413
1414 /**
1415  * Cleanup llog used for syncing.
1416  *
1417  * Closes and cleanups the llog. The function is called when the device is
1418  * shutting down.
1419  *
1420  * \param[in] env       LU environment provided by the caller
1421  * \param[in] d         OSP device
1422  */
1423 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1424 {
1425         struct llog_ctxt *ctxt;
1426
1427         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1428         if (ctxt) {
1429                 llog_cat_close(env, ctxt->loc_handle);
1430                 llog_cleanup(env, ctxt);
1431         }
1432 }
1433
1434 /**
1435  * Initialization of the sync component of OSP.
1436  *
1437  * Initializes the llog and starts a new thread to handle the changes to
1438  * the remote target (OST or MDT).
1439  *
1440  * \param[in] env       LU environment provided by the caller
1441  * \param[in] d         OSP device
1442  *
1443  * \retval 0            on success
1444  * \retval negative     negated errno on error
1445  */
1446 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1447 {
1448         struct l_wait_info       lwi = { 0 };
1449         struct task_struct      *task;
1450         int                      rc;
1451
1452         ENTRY;
1453
1454         d->opd_sync_max_rpcs_in_flight = OSP_MAX_RPCS_IN_FLIGHT;
1455         d->opd_sync_max_rpcs_in_progress = OSP_MAX_RPCS_IN_PROGRESS;
1456         spin_lock_init(&d->opd_sync_lock);
1457         init_waitqueue_head(&d->opd_sync_waitq);
1458         init_waitqueue_head(&d->opd_sync_barrier_waitq);
1459         thread_set_flags(&d->opd_sync_thread, SVC_INIT);
1460         init_waitqueue_head(&d->opd_sync_thread.t_ctl_waitq);
1461         INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
1462         INIT_LIST_HEAD(&d->opd_sync_committed_there);
1463
1464         if (d->opd_storage->dd_rdonly)
1465                 RETURN(0);
1466
1467         rc = osp_sync_id_traction_init(d);
1468         if (rc)
1469                 RETURN(rc);
1470
1471         /*
1472          * initialize llog storing changes
1473          */
1474         rc = osp_sync_llog_init(env, d);
1475         if (rc) {
1476                 CERROR("%s: can't initialize llog: rc = %d\n",
1477                        d->opd_obd->obd_name, rc);
1478                 GOTO(err_id, rc);
1479         }
1480
1481         /*
1482          * Start synchronization thread
1483          */
1484         task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
1485                            d->opd_index, d->opd_group);
1486         if (IS_ERR(task)) {
1487                 rc = PTR_ERR(task);
1488                 CERROR("%s: cannot start sync thread: rc = %d\n",
1489                        d->opd_obd->obd_name, rc);
1490                 GOTO(err_llog, rc);
1491         }
1492
1493         l_wait_event(d->opd_sync_thread.t_ctl_waitq,
1494                      osp_sync_running(d) || osp_sync_stopped(d), &lwi);
1495
1496         RETURN(0);
1497 err_llog:
1498         osp_sync_llog_fini(env, d);
1499 err_id:
1500         osp_sync_id_traction_fini(d);
1501         return rc;
1502 }
1503
1504 /**
1505  * Stop the syncing thread.
1506  *
1507  * Asks the syncing thread to stop and wait until it's stopped.
1508  *
1509  * \param[in] d         OSP device
1510  *
1511  * \retval              0
1512  */
1513 int osp_sync_fini(struct osp_device *d)
1514 {
1515         struct ptlrpc_thread *thread = &d->opd_sync_thread;
1516
1517         ENTRY;
1518
1519         if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
1520                 thread->t_flags = SVC_STOPPING;
1521                 wake_up(&d->opd_sync_waitq);
1522                 wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
1523         }
1524
1525         /*
1526          * unregister transaction callbacks only when sync thread
1527          * has finished operations with llog
1528          */
1529         osp_sync_id_traction_fini(d);
1530
1531         RETURN(0);
1532 }
1533
1534 static DEFINE_MUTEX(osp_id_tracker_sem);
1535 static struct list_head osp_id_tracker_list =
1536                 LIST_HEAD_INIT(osp_id_tracker_list);
1537
1538 /**
1539  * OSD commit callback.
1540  *
1541  * The function is used as a local OSD commit callback to track the highest
1542  * committed llog record id. see osp_sync_id_traction_init() for the details.
1543  *
1544  * \param[in] th        local transaction handle committed
1545  * \param[in] cookie    commit callback data (our private structure)
1546  */
1547 static void osp_sync_tracker_commit_cb(struct thandle *th, void *cookie)
1548 {
1549         struct osp_id_tracker   *tr = cookie;
1550         struct osp_device       *d;
1551         struct osp_txn_info     *txn;
1552
1553         LASSERT(tr);
1554
1555         txn = osp_txn_info(&th->th_ctx);
1556         if (txn == NULL || txn->oti_current_id < tr->otr_committed_id)
1557                 return;
1558
1559         spin_lock(&tr->otr_lock);
1560         if (likely(txn->oti_current_id > tr->otr_committed_id)) {
1561                 CDEBUG(D_OTHER, "committed: %llu -> %llu\n",
1562                        tr->otr_committed_id, txn->oti_current_id);
1563                 tr->otr_committed_id = txn->oti_current_id;
1564
1565                 list_for_each_entry(d, &tr->otr_wakeup_list,
1566                                     opd_sync_ontrack) {
1567                         d->opd_sync_last_committed_id = tr->otr_committed_id;
1568                         wake_up(&d->opd_sync_waitq);
1569                 }
1570         }
1571         spin_unlock(&tr->otr_lock);
1572 }
1573
1574 /**
1575  * Initialize commit tracking mechanism.
1576  *
1577  * Some setups may have thousands of OSTs and each will be represented by OSP.
1578  * Meaning order of magnitute many more changes to apply every second. In order
1579  * to keep the number of commit callbacks low this mechanism was introduced.
1580  * The mechanism is very similar to transno used by MDT service: it's an single
1581  * ID stream which can be assigned by any OSP to its llog records. The tricky
1582  * part is that ID is stored in per-transaction data and re-used by all the OSPs
1583  * involved in that transaction. Then all these OSPs are woken up utilizing a single OSD commit callback.
1584  *
1585  * The function initializes the data used by the tracker described above.
1586  * A singler tracker per OSD device is created.
1587  *
1588  * \param[in] d         OSP device
1589  *
1590  * \retval 0            on success
1591  * \retval negative     negated errno on error
1592  */
1593 static int osp_sync_id_traction_init(struct osp_device *d)
1594 {
1595         struct osp_id_tracker   *tr, *found = NULL;
1596         int                      rc = 0;
1597
1598         LASSERT(d);
1599         LASSERT(d->opd_storage);
1600         LASSERT(d->opd_sync_tracker == NULL);
1601         INIT_LIST_HEAD(&d->opd_sync_ontrack);
1602
1603         mutex_lock(&osp_id_tracker_sem);
1604         list_for_each_entry(tr, &osp_id_tracker_list, otr_list) {
1605                 if (tr->otr_dev == d->opd_storage) {
1606                         LASSERT(atomic_read(&tr->otr_refcount));
1607                         atomic_inc(&tr->otr_refcount);
1608                         d->opd_sync_tracker = tr;
1609                         found = tr;
1610                         break;
1611                 }
1612         }
1613
1614         if (found == NULL) {
1615                 rc = -ENOMEM;
1616                 OBD_ALLOC_PTR(tr);
1617                 if (tr) {
1618                         d->opd_sync_tracker = tr;
1619                         spin_lock_init(&tr->otr_lock);
1620                         tr->otr_dev = d->opd_storage;
1621                         tr->otr_next_id = 1;
1622                         tr->otr_committed_id = 0;
1623                         atomic_set(&tr->otr_refcount, 1);
1624                         INIT_LIST_HEAD(&tr->otr_wakeup_list);
1625                         list_add(&tr->otr_list, &osp_id_tracker_list);
1626                         tr->otr_tx_cb.dtc_txn_commit =
1627                                                 osp_sync_tracker_commit_cb;
1628                         tr->otr_tx_cb.dtc_cookie = tr;
1629                         tr->otr_tx_cb.dtc_tag = LCT_MD_THREAD;
1630                         dt_txn_callback_add(d->opd_storage, &tr->otr_tx_cb);
1631                         rc = 0;
1632                 }
1633         }
1634         mutex_unlock(&osp_id_tracker_sem);
1635
1636         return rc;
1637 }
1638
1639 /**
1640  * Release commit tracker.
1641  *
1642  * Decrease a refcounter on the tracker used by the given OSP device \a d.
1643  * If no more users left, then the tracker is released.
1644  *
1645  * \param[in] d         OSP device
1646  */
1647 static void osp_sync_id_traction_fini(struct osp_device *d)
1648 {
1649         struct osp_id_tracker *tr;
1650
1651         ENTRY;
1652
1653         LASSERT(d);
1654         tr = d->opd_sync_tracker;
1655         if (tr == NULL) {
1656                 EXIT;
1657                 return;
1658         }
1659
1660         osp_sync_remove_from_tracker(d);
1661
1662         mutex_lock(&osp_id_tracker_sem);
1663         if (atomic_dec_and_test(&tr->otr_refcount)) {
1664                 dt_txn_callback_del(d->opd_storage, &tr->otr_tx_cb);
1665                 LASSERT(list_empty(&tr->otr_wakeup_list));
1666                 list_del(&tr->otr_list);
1667                 OBD_FREE_PTR(tr);
1668                 d->opd_sync_tracker = NULL;
1669         }
1670         mutex_unlock(&osp_id_tracker_sem);
1671
1672         EXIT;
1673 }
1674
1675 /**
1676  * Generate a new ID on a tracker.
1677  *
1678  * Generates a new ID using the tracker associated with the given OSP device
1679  * \a d, if the given ID \a id is non-zero. Unconditially adds OSP device to
1680  * the wakeup list, so OSP won't miss when a transaction using the ID is
1681  * committed.
1682  *
1683  * \param[in] d         OSP device
1684  * \param[in] id        0 or ID generated previously
1685  *
1686  * \retval              ID the caller should use
1687  */
1688 static __u64 osp_sync_id_get(struct osp_device *d, __u64 id)
1689 {
1690         struct osp_id_tracker *tr;
1691
1692         tr = d->opd_sync_tracker;
1693         LASSERT(tr);
1694
1695         /* XXX: we can improve this introducing per-cpu preallocated ids? */
1696         spin_lock(&tr->otr_lock);
1697         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_TRACK_OVERFLOW))
1698                 tr->otr_next_id = 0xfffffff0;
1699
1700         if (unlikely(tr->otr_next_id <= d->opd_sync_last_used_id)) {
1701                 spin_unlock(&tr->otr_lock);
1702                 CERROR("%s: next %llu, last synced %llu\n",
1703                        d->opd_obd->obd_name, tr->otr_next_id,
1704                        d->opd_sync_last_used_id);
1705                 LBUG();
1706         }
1707
1708         if (id == 0)
1709                 id = tr->otr_next_id++;
1710         if (id > d->opd_sync_last_used_id)
1711                 d->opd_sync_last_used_id = id;
1712         if (list_empty(&d->opd_sync_ontrack))
1713                 list_add(&d->opd_sync_ontrack, &tr->otr_wakeup_list);
1714         spin_unlock(&tr->otr_lock);
1715         CDEBUG(D_OTHER, "new id %llu\n", id);
1716
1717         return id;
1718 }
1719
1720 /**
1721  * Stop to propagate commit status to OSP.
1722  *
1723  * If the OSP does not have any llog records she's waiting to commit, then
1724  * it is possible to unsubscribe from wakeups from the tracking using this
1725  * method.
1726  *
1727  * \param[in] d         OSP device not willing to wakeup
1728  */
1729 static void osp_sync_remove_from_tracker(struct osp_device *d)
1730 {
1731         struct osp_id_tracker *tr;
1732
1733         tr = d->opd_sync_tracker;
1734         LASSERT(tr);
1735
1736         if (list_empty(&d->opd_sync_ontrack))
1737                 return;
1738
1739         spin_lock(&tr->otr_lock);
1740         list_del_init(&d->opd_sync_ontrack);
1741         spin_unlock(&tr->otr_lock);
1742 }
1743