Whamcloud - gitweb
LU-17662 osd-zfs: Support for ZFS 2.2.3
[fs/lustre-release.git] / lustre / osp / osp_sync.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/osp/osp_sync.c
32  *
33  * Lustre OST Proxy Device
34  *
35  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
36  * Author: Mikhail Pershin <mike.pershin@intel.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <linux/kthread.h>
42 #include <linux/delay.h>
43 #include <lustre_log.h>
44 #include <lustre_update.h>
45 #include "osp_internal.h"
46
47 /*
48  * This is a components of OSP implementing synchronization between MDS and OST
49  * it LLOGs all interesting changes (e.g. UID/GID/projid, late striping, layout
50  * version, and object destroy) atomically, then makes sure changes hit OST
51  * storage
52  *
53  * The OSP syncing thread keeps on reading change records from LLOG. If there
54  * are still idle slots for both in-flight RPCs (opd_sync_rpcs_in_flight) and
55  * in-progress jobs (opd_sync_rpcs_in_progress), the thread will create one
56  * sycning RPC for each LLOG record and send it to corresponding OST for remote
57  * committing. Once RPC is reported committed by OST (using regular
58  * last_committed mechanism), the job is enqueued (into opd_sync_committed_there)
59  * for later cancelling of the corresponded LLOG record.
60  *
61  * opd_sync_changes is the number of unread LLOG records (to be processed).
62  * Please note that this number only includes the LLOG records newly
63  * accumulated since the newest start of the MDS service, but not includes the
64  * LLOG records saved in disk before that. Thus, the actual number of LLOG
65  * records might be larger than this.
66  *
67  * opd_sync_rpcs_in_flight is the in-flight RPC number. An syncing RPC is
68  * in-flight until the MDS received the reply from the corresponding OST (no
69  * matter the remote commit succeeded or failed). The total number of
70  * in-flight RPCs is limited by OSP_MAX_RPCS_IN_FLIGHT.
71  *
72  * opd_sync_rpcs_in_progress is the number of RPCs that is still in progress. A
73  * syncing RPC is consider to be in progress until its LLOG record got
74  * canceled. As we can see, an in-flight RPC is obviously still in progress.
75  * The total number of in-progress RPCs is limited by OSP_MAX_RPCS_IN_PROGRESS.
76  */
77
78 #define OSP_MAX_RPCS_IN_FLIGHT          8
79 #define OSP_MAX_RPCS_IN_PROGRESS        4096
80 #define OSP_MAX_SYNC_CHANGES            2000000000
81
82 #define OSP_JOB_MAGIC           0x26112005
83
84 struct osp_job_req_args {
85         /** bytes reserved for ptlrpc_replay_req() */
86         struct ptlrpc_replay_async_args jra_raa;
87         struct list_head                jra_committed_link;
88         struct list_head                jra_in_flight_link;
89         struct llog_cookie              jra_lcookie;
90         __u32                           jra_magic;
91 };
92
93 static int osp_sync_add_commit_cb(const struct lu_env *env,
94                                   struct osp_device *d, struct thandle *th);
95
96 /*
97  ** Check for new changes to sync
98  *
99  * \param[in] d         OSP device
100  *
101  * \retval 1            there are changes
102  * \retval 0            there are no changes
103  */
104 static inline int osp_sync_has_new_job(struct osp_device *d)
105 {
106         return atomic_read(&d->opd_sync_changes) > 0 ||
107                 d->opd_sync_prev_done == 0;
108 }
109
110 static inline int osp_sync_in_flight_conflict(struct osp_device *d,
111                                              struct llog_rec_hdr *h)
112 {
113         struct osp_job_req_args *jra;
114         struct ost_id            ostid;
115         int                      conflict = 0;
116
117         if (h == NULL || h->lrh_type == LLOG_GEN_REC ||
118             list_empty(&d->opd_sync_in_flight_list))
119                 return conflict;
120
121         memset(&ostid, 0, sizeof(ostid));
122         switch (h->lrh_type) {
123         case MDS_UNLINK_REC: {
124                 struct llog_unlink_rec *unlink = (struct llog_unlink_rec *)h;
125
126                 ostid_set_seq(&ostid, unlink->lur_oseq);
127                 if (ostid_set_id(&ostid, unlink->lur_oid)) {
128                         CERROR("Bad %llu to set " DOSTID "\n",
129                                (unsigned long long)(unlink->lur_oid),
130                                POSTID(&ostid));
131                         return 1;
132                 }
133                 }
134                 break;
135         case MDS_UNLINK64_REC:
136                 fid_to_ostid(&((struct llog_unlink64_rec *)h)->lur_fid, &ostid);
137                 break;
138         case MDS_SETATTR64_REC:
139                 ostid = ((struct llog_setattr64_rec *)h)->lsr_oi;
140                 break;
141         default:
142                 LBUG();
143         }
144
145         spin_lock(&d->opd_sync_lock);
146         list_for_each_entry(jra, &d->opd_sync_in_flight_list,
147                             jra_in_flight_link) {
148                 struct ptlrpc_request   *req;
149                 struct ost_body         *body;
150
151                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
152
153                 req = container_of((void *)jra, struct ptlrpc_request,
154                                    rq_async_args);
155                 body = req_capsule_client_get(&req->rq_pill,
156                                               &RMF_OST_BODY);
157                 LASSERT(body);
158
159                 if (memcmp(&ostid, &body->oa.o_oi, sizeof(ostid)) == 0) {
160                         conflict = 1;
161                         break;
162                 }
163         }
164         spin_unlock(&d->opd_sync_lock);
165
166         return conflict;
167 }
168
169 static inline int osp_sync_rpcs_in_progress_low(struct osp_device *d)
170 {
171         return atomic_read(&d->opd_sync_rpcs_in_progress) <
172                 d->opd_sync_max_rpcs_in_progress;
173 }
174
175 /**
176  * Check for room in the network pipe to OST
177  *
178  * \param[in] d         OSP device
179  *
180  * \retval 1            there is room
181  * \retval 0            no room, the pipe is full
182  */
183 static inline int osp_sync_rpcs_in_flight_low(struct osp_device *d)
184 {
185         return atomic_read(&d->opd_sync_rpcs_in_flight) <
186                 d->opd_sync_max_rpcs_in_flight;
187 }
188
189 /**
190  * Wake up check for the main sync thread
191  *
192  * \param[in] d         OSP device
193  *
194  * \retval 1            time to wake up
195  * \retval 0            no need to wake up
196  */
197 static inline int osp_sync_has_work(struct osp_device *osp)
198 {
199         /* has new/old changes and low in-progress? */
200         if (osp_sync_has_new_job(osp) && osp_sync_rpcs_in_progress_low(osp) &&
201             osp_sync_rpcs_in_flight_low(osp) && osp->opd_imp_connected)
202                 return 1;
203
204         /* has remotely committed? */
205         if (!list_empty(&osp->opd_sync_committed_there))
206                 return 1;
207
208         return 0;
209 }
210
211 void osp_sync_check_for_work(struct osp_device *osp)
212 {
213         if (osp_sync_has_work(osp))
214                 wake_up(&osp->opd_sync_waitq);
215 }
216
217 static inline __u64 osp_sync_correct_id(struct osp_device *d,
218                                         struct llog_rec_hdr *rec)
219 {
220         /*
221          * llog use cyclic store with 32 bit lrh_id
222          * so overflow lrh_id is possible. Range between
223          * last_processed and last_committed is less than
224          * 64745 ^ 2 and less than 2^32 - 1
225          */
226         __u64 correct_id = d->opd_sync_last_committed_id;
227
228         if ((correct_id & 0xffffffffULL) < rec->lrh_id)
229                 correct_id -= 0x100000000ULL;
230
231         correct_id &= ~0xffffffffULL;
232         correct_id |= rec->lrh_id;
233
234         return correct_id;
235 }
236
237 /**
238  * Check and return ready-for-new status.
239  *
240  * The thread processing llog record uses this function to check whether
241  * it's time to take another record and process it. The number of conditions
242  * must be met: the connection should be ready, RPCs in flight not exceeding
243  * the limit, the record is committed locally, etc (see the lines below).
244  *
245  * \param[in] d         OSP device
246  * \param[in] rec       next llog record to process
247  *
248  * \retval 0            not ready
249  * \retval 1            ready
250  */
251 static inline int osp_sync_can_process_new(struct osp_device *d,
252                                            struct llog_rec_hdr *rec)
253 {
254         LASSERT(d);
255
256         if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
257                 return 0;
258         if (unlikely(osp_sync_in_flight_conflict(d, rec)))
259                 return 0;
260         if (!osp_sync_rpcs_in_progress_low(d))
261                 return 0;
262         if (!osp_sync_rpcs_in_flight_low(d))
263                 return 0;
264         if (!d->opd_imp_connected)
265                 return 0;
266         if (d->opd_sync_prev_done == 0)
267                 return 1;
268         if (atomic_read(&d->opd_sync_changes) == 0)
269                 return 0;
270         if (rec == NULL)
271                 return 1;
272         /* notice "<" not "<=" */
273         if (osp_sync_correct_id(d, rec) < d->opd_sync_last_committed_id)
274                 return 1;
275         return 0;
276 }
277
278 /**
279  * Declare intention to add a new change.
280  *
281  * With regard to OSD API, we have to declare any changes ahead. In this
282  * case we declare an intention to add a llog record representing the
283  * change on the local storage.
284  *
285  * \param[in] env       LU environment provided by the caller
286  * \param[in] o         OSP object
287  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
288  * \param[in] th        transaction handle (local)
289  *
290  * \retval 0            on success
291  * \retval negative     negated errno on error
292  */
293 int osp_sync_declare_add(const struct lu_env *env, struct osp_object *o,
294                          enum llog_op_type type, struct thandle *th)
295 {
296         struct osp_thread_info  *osi = osp_env_info(env);
297         struct osp_device       *d = lu2osp_dev(o->opo_obj.do_lu.lo_dev);
298         struct llog_ctxt        *ctxt;
299         struct thandle          *storage_th;
300         int                      rc;
301
302         ENTRY;
303
304         /* it's a layering violation, to access internals of th,
305          * but we can do this as a sanity check, for a while */
306         LASSERT(th->th_top != NULL);
307
308         if (atomic_read(&d->opd_sync_changes) > d->opd_sync_max_changes) {
309                 /* Attempt to slow sync changes queuing rate */
310                 CWARN("%s: queued changes counter exceeds limit %d > %d\n",
311                       d->opd_obd->obd_name, atomic_read(&d->opd_sync_changes),
312                       d->opd_sync_max_changes);
313                 RETURN(-EINPROGRESS);
314         }
315         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
316         if (IS_ERR(storage_th))
317                 RETURN(PTR_ERR(storage_th));
318
319         switch (type) {
320         case MDS_UNLINK64_REC:
321                 osi->osi_hdr.lrh_len = sizeof(struct llog_unlink64_rec);
322                 break;
323         case MDS_SETATTR64_REC:
324                 osi->osi_hdr.lrh_len = sizeof(struct llog_setattr64_rec_v2);
325                 break;
326         default:
327                 LBUG();
328         }
329
330         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
331         if (!ctxt) {
332                 /* for a reason OSP wasn't able to open llog,
333                  * just skip logging this operation and hope
334                  * LFSCK will fix it eventually */
335                 CERROR("logging isn't available, run LFSCK\n");
336                 RETURN(0);
337         }
338
339         rc = llog_declare_add(env, ctxt->loc_handle, &osi->osi_hdr,
340                               storage_th);
341         llog_ctxt_put(ctxt);
342
343         RETURN(rc);
344 }
345
346 /**
347  * Generate a llog record for a given change.
348  *
349  * Generates a llog record for the change passed. The change can be of two
350  * types: unlink and setattr. The record gets an ID which later will be
351  * used to track commit status of the change. For unlink changes, the caller
352  * can supply a starting FID and the count of the objects to destroy. For
353  * setattr the caller should apply attributes to apply.
354  *
355  *
356  * \param[in] env       LU environment provided by the caller
357  * \param[in] d         OSP device
358  * \param[in] fid       fid of the object the change should be applied to
359  * \param[in] type      type of change: MDS_UNLINK64_REC or MDS_SETATTR64_REC
360  * \param[in] count     count of objects to destroy
361  * \param[in] th        transaction handle (local)
362  * \param[in] attr      attributes for setattr
363  *
364  * \retval 0            on success
365  * \retval negative     negated errno on error
366  */
367 static int osp_sync_add_rec(const struct lu_env *env, struct osp_device *d,
368                             const struct lu_fid *fid, enum llog_op_type type,
369                             int count, struct thandle *th,
370                             const struct lu_attr *attr)
371 {
372         struct osp_thread_info  *osi = osp_env_info(env);
373         struct llog_ctxt        *ctxt;
374         struct thandle          *storage_th;
375         bool                     immediate_commit_cb = false;
376         int                      rc;
377
378         ENTRY;
379
380         /* it's a layering violation, to access internals of th,
381          * but we can do this as a sanity check, for a while */
382         LASSERT(th->th_top != NULL);
383         storage_th = thandle_get_sub_by_dt(env, th->th_top, d->opd_storage);
384         if (IS_ERR(storage_th))
385                 RETURN(PTR_ERR(storage_th));
386
387         switch (type) {
388         case MDS_UNLINK64_REC:
389                 osi->osi_hdr.lrh_len = sizeof(osi->osi_unlink);
390                 osi->osi_hdr.lrh_type = MDS_UNLINK64_REC;
391                 osi->osi_unlink.lur_fid  = *fid;
392                 osi->osi_unlink.lur_count = count;
393                 break;
394         case MDS_SETATTR64_REC:
395                 rc = fid_to_ostid(fid, &osi->osi_oi);
396                 LASSERT(rc == 0);
397                 osi->osi_hdr.lrh_len = sizeof(osi->osi_setattr);
398                 osi->osi_hdr.lrh_type = MDS_SETATTR64_REC;
399                 osi->osi_setattr.lsr_oi  = osi->osi_oi;
400                 LASSERT(attr);
401                 osi->osi_setattr.lsr_uid = attr->la_uid;
402                 osi->osi_setattr.lsr_gid = attr->la_gid;
403                 osi->osi_setattr.lsr_layout_version = attr->la_layout_version;
404                 osi->osi_setattr.lsr_projid = attr->la_projid;
405                 osi->osi_setattr.lsr_valid =
406                         ((attr->la_valid & LA_UID) ? OBD_MD_FLUID : 0) |
407                         ((attr->la_valid & LA_GID) ? OBD_MD_FLGID : 0) |
408                         ((attr->la_valid & LA_PROJID) ? OBD_MD_FLPROJID : 0);
409                 if (attr->la_valid & LA_LAYOUT_VERSION) {
410                         osi->osi_setattr.lsr_valid |= OBD_MD_LAYOUT_VERSION;
411
412                         /* FLR: the layout version has to be transferred to
413                          * OST objects ASAP, otherwise clients will have to
414                          * experience delay to be able to write OST objects. */
415                         immediate_commit_cb = true;
416                 }
417                 break;
418         default:
419                 LBUG();
420         }
421
422         /* we keep the same id, but increment it when the callback
423          * is registered, so that all records upto the one taken
424          * by the callback are subject to processing */
425         spin_lock(&d->opd_sync_lock);
426         osi->osi_hdr.lrh_id = d->opd_sync_last_used_id;
427         spin_unlock(&d->opd_sync_lock);
428
429         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
430         if (ctxt == NULL) {
431                 /* see comment in osp_sync_declare_add() */
432                 RETURN(0);
433         }
434
435         rc = llog_add(env, ctxt->loc_handle, &osi->osi_hdr, &osi->osi_cookie,
436                       storage_th);
437         llog_ctxt_put(ctxt);
438
439         if (likely(rc >= 0)) {
440                 CDEBUG(D_OTHER, "%s: new record "DFID".%u: rc = %d\n",
441                        d->opd_obd->obd_name,
442                        PLOGID(&osi->osi_cookie.lgc_lgl),
443                        osi->osi_cookie.lgc_index, rc);
444                 atomic_inc(&d->opd_sync_changes);
445         }
446
447         if (immediate_commit_cb)
448                 rc = osp_sync_add_commit_cb(env, d, th);
449         else
450                 rc = osp_sync_add_commit_cb_1s(env, d, th);
451
452         /* return 0 always here, error case just cause no llog record */
453         RETURN(0);
454 }
455
456 int osp_sync_add(const struct lu_env *env, struct osp_object *o,
457                  enum llog_op_type type, struct thandle *th,
458                  const struct lu_attr *attr)
459 {
460         return osp_sync_add_rec(env, lu2osp_dev(o->opo_obj.do_lu.lo_dev),
461                                 lu_object_fid(&o->opo_obj.do_lu), type, 1,
462                                 th, attr);
463 }
464
465 /*
466  * it's quite obvious we can't maintain all the structures in the memory:
467  * while OST is down, MDS can be processing thousands and thousands of unlinks
468  * filling persistent llogs and in-core respresentation
469  *
470  * this doesn't scale at all. so we need basically the following:
471  * a) destroy/setattr append llog records
472  * b) once llog has grown to X records, we process first Y committed records
473  *
474  *  once record R is found via llog_process(), it becomes committed after any
475  *  subsequent commit callback (at the most)
476  */
477
478 /**
479  * ptlrpc commit callback.
480  *
481  * The callback is called by PTLRPC when a RPC is reported committed by the
482  * target (OST). We register the callback for the every RPC applying a change
483  * from the llog. This way we know then the llog records can be cancelled.
484  * Notice the callback can be called when OSP is finishing. We can detect this
485  * checking that actual transno in the request is less or equal of known
486  * committed transno (see osp_sync_process_committed() for the details).
487  * XXX: this is pretty expensive and can be improved later using batching.
488  *
489  * \param[in] req       request
490  */
491 static void osp_sync_request_commit_cb(struct ptlrpc_request *req)
492 {
493         struct osp_device *d = req->rq_cb_data;
494         struct osp_job_req_args *jra;
495
496         CDEBUG(D_HA, "commit req %p, transno %llu\n", req, req->rq_transno);
497
498         if (unlikely(req->rq_transno == 0))
499                 return;
500
501         /* do not do any opd_sync_rpcs_* accounting here
502          * it's done in osp_sync_interpret sooner or later */
503         LASSERT(d);
504
505         jra = ptlrpc_req_async_args(jra, req);
506         LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
507         LASSERT(list_empty(&jra->jra_committed_link));
508
509         ptlrpc_request_addref(req);
510
511         spin_lock(&d->opd_sync_lock);
512         list_add(&jra->jra_committed_link, &d->opd_sync_committed_there);
513         spin_unlock(&d->opd_sync_lock);
514
515         /* XXX: some batching wouldn't hurt */
516         wake_up(&d->opd_sync_waitq);
517 }
518
519 /**
520  * RPC interpretation callback.
521  *
522  * The callback is called by ptlrpc when RPC is replied. Now we have to decide
523  * whether we should:
524  *  - put request on a special list to wait until it's committed by the target,
525  *    if the request is successful
526  *  - schedule llog record cancel if no target object is found
527  *  - try later (essentially after reboot) in case of unexpected error
528  *
529  * \param[in] env       LU environment provided by the caller
530  * \param[in] req       request replied
531  * \param[in] aa        callback data
532  * \param[in] rc        result of RPC
533  *
534  * \retval 0            always
535  */
536 static int osp_sync_interpret(const struct lu_env *env,
537                               struct ptlrpc_request *req, void *args, int rc)
538 {
539         struct osp_job_req_args *jra = args;
540         struct osp_device *d = req->rq_cb_data;
541
542         if (jra->jra_magic != OSP_JOB_MAGIC) {
543                 DEBUG_REQ(D_ERROR, req, "bad magic %u", jra->jra_magic);
544                 LBUG();
545         }
546         LASSERT(d);
547
548         CDEBUG(D_HA, "reply req %p/%d, rc %d, transno %u\n", req,
549                atomic_read(&req->rq_refcount),
550                rc, (unsigned) req->rq_transno);
551
552         if (rc == -ENOENT) {
553                 /*
554                  * we tried to destroy object or update attributes,
555                  * but object doesn't exist anymore - cancell llog record
556                  */
557                 LASSERT(req->rq_transno == 0);
558                 LASSERT(list_empty(&jra->jra_committed_link));
559
560                 ptlrpc_request_addref(req);
561
562                 spin_lock(&d->opd_sync_lock);
563                 list_add(&jra->jra_committed_link,
564                          &d->opd_sync_committed_there);
565                 spin_unlock(&d->opd_sync_lock);
566
567                 wake_up(&d->opd_sync_waitq);
568         } else if (rc) {
569                 struct obd_import *imp = req->rq_import;
570                 /*
571                  * error happened, we'll try to repeat on next boot ?
572                  */
573                 LASSERTF(req->rq_transno == 0 || rc == -EIO || rc == -EROFS ||
574                          req->rq_import_generation < imp->imp_generation,
575                          "transno %llu, rc %d, gen: req %d, imp %d\n",
576                          req->rq_transno, rc, req->rq_import_generation,
577                          imp->imp_generation);
578                 if (req->rq_transno == 0) {
579                         /* this is the last time we see the request
580                          * if transno is not zero, then commit cb
581                          * will be called at some point */
582                         LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) > 0);
583                         atomic_dec(&d->opd_sync_rpcs_in_progress);
584                 }
585
586                 wake_up(&d->opd_sync_waitq);
587         } else if (d->opd_pre != NULL &&
588                    unlikely(d->opd_pre_status == -ENOSPC)) {
589                 /*
590                  * if current status is -ENOSPC (lack of free space on OST)
591                  * then we should poll OST immediately once object destroy
592                  * is replied
593                  */
594                 osp_statfs_need_now(d);
595         }
596
597         spin_lock(&d->opd_sync_lock);
598         list_del_init(&jra->jra_in_flight_link);
599         spin_unlock(&d->opd_sync_lock);
600         LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) > 0);
601         atomic_dec(&d->opd_sync_rpcs_in_flight);
602         if (unlikely(atomic_read(&d->opd_sync_barrier) > 0))
603                 wake_up(&d->opd_sync_barrier_waitq);
604         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
605                d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
606                atomic_read(&d->opd_sync_rpcs_in_progress));
607
608         osp_sync_check_for_work(d);
609
610         return 0;
611 }
612
613 /*
614  ** Add request to ptlrpc queue.
615  *
616  * This is just a tiny helper function to put the request on the sending list
617  *
618  * \param[in] d         OSP device
619  * \param[in] llh       llog handle where the record is stored
620  * \param[in] h         llog record
621  * \param[in] req       request
622  */
623 static void osp_sync_send_new_rpc(struct osp_device *d,
624                                   struct llog_handle *llh,
625                                   struct llog_rec_hdr *h,
626                                   struct ptlrpc_request *req)
627 {
628         struct osp_job_req_args *jra;
629
630         LASSERT(atomic_read(&d->opd_sync_rpcs_in_flight) <=
631                 d->opd_sync_max_rpcs_in_flight);
632
633         jra = ptlrpc_req_async_args(jra, req);
634         jra->jra_magic = OSP_JOB_MAGIC;
635         jra->jra_lcookie.lgc_lgl = llh->lgh_id;
636         jra->jra_lcookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
637         jra->jra_lcookie.lgc_index = h->lrh_index;
638         INIT_LIST_HEAD(&jra->jra_committed_link);
639         spin_lock(&d->opd_sync_lock);
640         list_add_tail(&jra->jra_in_flight_link, &d->opd_sync_in_flight_list);
641         spin_unlock(&d->opd_sync_lock);
642
643         ptlrpcd_add_req(req);
644 }
645
646
647 /**
648  * Allocate and prepare RPC for a new change.
649  *
650  * The function allocates and initializes an RPC which will be sent soon to
651  * apply the change to the target OST. The request is initialized from the
652  * llog record passed. Notice only the fields common to all type of changes
653  * are initialized.
654  *
655  * \param[in] d         OSP device
656  * \param[in] op        type of the change
657  * \param[in] format    request format to be used
658  *
659  * \retval pointer              new request on success
660  * \retval ERR_PTR(errno)       on error
661  */
662 static struct ptlrpc_request *osp_sync_new_job(struct osp_device *d,
663                                                enum ost_cmd op,
664                                                const struct req_format *format)
665 {
666         struct ptlrpc_request   *req;
667         struct obd_import       *imp;
668         int                      rc;
669
670         /* Prepare the request */
671         imp = d->opd_obd->u.cli.cl_import;
672         LASSERT(imp);
673
674         if (CFS_FAIL_CHECK(OBD_FAIL_OSP_CHECK_ENOMEM))
675                 RETURN(ERR_PTR(-ENOMEM));
676
677         req = ptlrpc_request_alloc(imp, format);
678         if (req == NULL)
679                 RETURN(ERR_PTR(-ENOMEM));
680
681         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, op);
682         if (rc) {
683                 ptlrpc_req_put(req);
684                 return ERR_PTR(rc);
685         }
686
687         req->rq_interpret_reply = osp_sync_interpret;
688         req->rq_commit_cb = osp_sync_request_commit_cb;
689         req->rq_cb_data = d;
690
691         ptlrpc_request_set_replen(req);
692
693         return req;
694 }
695
696 /**
697  * Generate a request for setattr change.
698  *
699  * The function prepares a new RPC, initializes it with setattr specific
700  * bits and send the RPC.
701  *
702  * \param[in] d         OSP device
703  * \param[in] llh       llog handle where the record is stored
704  * \param[in] h         llog record
705  *
706  * \retval 0            on success
707  * \retval 1            on invalid record
708  * \retval negative     negated errno on error
709  */
710 static int osp_sync_new_setattr_job(struct osp_device *d,
711                                     struct llog_handle *llh,
712                                     struct llog_rec_hdr *h)
713 {
714         struct llog_setattr64_rec       *rec = (struct llog_setattr64_rec *)h;
715         struct ptlrpc_request           *req;
716         struct ost_body                 *body;
717
718         ENTRY;
719         LASSERT(h->lrh_type == MDS_SETATTR64_REC);
720
721         if (CFS_FAIL_CHECK(OBD_FAIL_OSP_CHECK_INVALID_REC))
722                 RETURN(1);
723
724         /* lsr_valid can only be 0 or HAVE OBD_MD_{FLUID, FLGID, FLPROJID} set,
725          * so no bits other than these should be set. */
726         if ((rec->lsr_valid & ~(OBD_MD_FLUID | OBD_MD_FLGID |
727             OBD_MD_FLPROJID | OBD_MD_LAYOUT_VERSION)) != 0) {
728                 CERROR("%s: invalid setattr record, lsr_valid:%llu\n",
729                         d->opd_obd->obd_name, rec->lsr_valid);
730                 /* return 1 on invalid record */
731                 RETURN(1);
732         }
733
734         req = osp_sync_new_job(d, OST_SETATTR, &RQF_OST_SETATTR);
735         if (IS_ERR(req))
736                 RETURN(PTR_ERR(req));
737
738         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
739         LASSERT(body);
740         body->oa.o_oi = rec->lsr_oi;
741         body->oa.o_uid = rec->lsr_uid;
742         body->oa.o_gid = rec->lsr_gid;
743         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
744         if (h->lrh_len > sizeof(struct llog_setattr64_rec)) {
745                 struct llog_setattr64_rec_v2 *rec_v2 = (typeof(rec_v2))rec;
746                 body->oa.o_projid = rec_v2->lsr_projid;
747                 body->oa.o_layout_version = rec_v2->lsr_layout_version;
748         }
749
750         /* old setattr record (prior 2.6.0) doesn't have 'valid' stored,
751          * we assume that both UID and GID are valid in that case. */
752         if (rec->lsr_valid == 0)
753                 body->oa.o_valid |= (OBD_MD_FLUID | OBD_MD_FLGID);
754         else
755                 body->oa.o_valid |= rec->lsr_valid;
756
757         osp_sync_send_new_rpc(d, llh, h, req);
758         RETURN(0);
759 }
760
761 /**
762  * Generate a request for unlink change.
763  *
764  * The function prepares a new RPC, initializes it with unlink(destroy)
765  * specific bits and sends the RPC. The function is used to handle
766  * llog_unlink_rec which were used in the older versions of Lustre.
767  * Current version uses llog_unlink_rec64.
768  *
769  * \param[in] d         OSP device
770  * \param[in] llh       llog handle where the record is stored
771  * \param[in] h         llog record
772  *
773  * \retval 0            on success
774  * \retval negative     negated errno on error
775  */
776 static int osp_sync_new_unlink_job(struct osp_device *d,
777                                    struct llog_handle *llh,
778                                    struct llog_rec_hdr *h)
779 {
780         struct llog_unlink_rec  *rec = (struct llog_unlink_rec *)h;
781         struct ptlrpc_request   *req;
782         struct ost_body         *body;
783         int rc;
784
785         ENTRY;
786         LASSERT(h->lrh_type == MDS_UNLINK_REC);
787
788         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
789         if (IS_ERR(req))
790                 RETURN(PTR_ERR(req));
791
792         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793         LASSERT(body);
794         ostid_set_seq(&body->oa.o_oi, rec->lur_oseq);
795         rc = ostid_set_id(&body->oa.o_oi, rec->lur_oid);
796         if (rc)
797                 return rc;
798         body->oa.o_misc = rec->lur_count;
799         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID;
800         if (rec->lur_count)
801                 body->oa.o_valid |= OBD_MD_FLOBJCOUNT;
802
803         osp_sync_send_new_rpc(d, llh, h, req);
804         RETURN(0);
805 }
806
807 /**
808  * Generate a request for unlink change.
809  *
810  * The function prepares a new RPC, initializes it with unlink(destroy)
811  * specific bits and sends the RPC. Depending on the target (MDT or OST)
812  * two different protocols are used. For MDT we use OUT (basically OSD API
813  * updates transferred via a network). For OST we still use the old
814  * protocol (OBD?), originally for compatibility. Later we can start to
815  * use OUT for OST as well, this will allow batching and better code
816  * unification.
817  *
818  * \param[in] d         OSP device
819  * \param[in] llh       llog handle where the record is stored
820  * \param[in] h         llog record
821  *
822  * \retval 0            on success
823  * \retval negative     negated errno on error
824  */
825 static int osp_sync_new_unlink64_job(struct osp_device *d,
826                                      struct llog_handle *llh,
827                                      struct llog_rec_hdr *h)
828 {
829         struct llog_unlink64_rec        *rec = (struct llog_unlink64_rec *)h;
830         struct ptlrpc_request           *req = NULL;
831         struct ost_body                 *body;
832         int                              rc;
833
834         ENTRY;
835         LASSERT(h->lrh_type == MDS_UNLINK64_REC);
836         req = osp_sync_new_job(d, OST_DESTROY, &RQF_OST_DESTROY);
837         if (IS_ERR(req))
838                 RETURN(PTR_ERR(req));
839
840         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
841         if (body == NULL)
842                 RETURN(-EFAULT);
843         rc = fid_to_ostid(&rec->lur_fid, &body->oa.o_oi);
844         if (rc < 0)
845                 RETURN(rc);
846         body->oa.o_misc = rec->lur_count;
847         body->oa.o_valid = OBD_MD_FLGROUP | OBD_MD_FLID |
848                            OBD_MD_FLOBJCOUNT;
849         osp_sync_send_new_rpc(d, llh, h, req);
850         RETURN(0);
851 }
852
853 /**
854  * Process llog records.
855  *
856  * This function is called to process the llog records committed locally.
857  * In the recovery model used by OSP we can apply a change to a remote
858  * target once corresponding transaction (like posix unlink) is committed
859  * locally so can't revert.
860  * Depending on the llog record type, a given handler is called that is
861  * responsible for preparing and sending the RPC to apply the change.
862  * Special record type LLOG_GEN_REC marking a reboot is cancelled right away.
863  *
864  * \param[in] env       LU environment provided by the caller
865  * \param[in] d         OSP device
866  * \param[in] llh       llog handle where the record is stored
867  * \param[in] rec       llog record
868  */
869 static void osp_sync_process_record(const struct lu_env *env,
870                                     struct osp_device *d,
871                                     struct llog_handle *llh,
872                                     struct llog_rec_hdr *rec)
873 {
874         struct llog_handle      *cathandle = llh->u.phd.phd_cat_handle;
875         struct llog_cookie       cookie;
876         int                      rc = 0;
877
878         ENTRY;
879
880         cookie.lgc_lgl = llh->lgh_id;
881         cookie.lgc_subsys = LLOG_MDS_OST_ORIG_CTXT;
882         cookie.lgc_index = rec->lrh_index;
883
884         d->opd_sync_last_catalog_idx = llh->lgh_hdr->llh_cat_idx;
885
886         if (unlikely(rec->lrh_type == LLOG_GEN_REC)) {
887                 struct llog_gen_rec *gen = (struct llog_gen_rec *)rec;
888
889                 /* we're waiting for the record generated by this instance */
890                 LASSERT(d->opd_sync_prev_done == 0);
891                 if (!memcmp(&d->opd_sync_generation, &gen->lgr_gen,
892                             sizeof(gen->lgr_gen))) {
893                         CDEBUG(D_HA, "processed all old entries\n");
894                         d->opd_sync_prev_done = 1;
895                 }
896
897                 /* cancel any generation record */
898                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
899
900                 /* flush all pending records ASAP */
901                 osp_sync_force(env, d);
902
903                 RETURN_EXIT;
904         }
905
906         /*
907          * now we prepare and fill requests to OST, put them on the queue
908          * and fire after next commit callback
909          */
910
911         /* notice we increment counters before sending RPC, to be consistent
912          * in RPC interpret callback which may happen very quickly */
913         atomic_inc(&d->opd_sync_rpcs_in_flight);
914         atomic_inc(&d->opd_sync_rpcs_in_progress);
915
916         switch (rec->lrh_type) {
917         /* case MDS_UNLINK_REC is kept for compatibility */
918         case MDS_UNLINK_REC:
919                 rc = osp_sync_new_unlink_job(d, llh, rec);
920                 break;
921         case MDS_UNLINK64_REC:
922                 rc = osp_sync_new_unlink64_job(d, llh, rec);
923                 break;
924         case MDS_SETATTR64_REC:
925                 rc = osp_sync_new_setattr_job(d, llh, rec);
926                 break;
927         default:
928                 CERROR("%s: unknown record type: %x\n", d->opd_obd->obd_name,
929                        rec->lrh_type);
930                 /* treat "unknown record type" as "invalid" */
931                 rc = 1;
932                 break;
933         }
934
935         /* For all kinds of records, not matter successful or not,
936          * we should decrease changes and bump last_processed_id.
937          */
938         if (d->opd_sync_prev_done) {
939                 LASSERT(atomic_read(&d->opd_sync_changes) > 0);
940                 atomic_dec(&d->opd_sync_changes);
941                 wake_up(&d->opd_sync_barrier_waitq);
942         }
943         atomic64_inc(&d->opd_sync_processed_recs);
944         if (rc != 0) {
945                 atomic_dec(&d->opd_sync_rpcs_in_flight);
946                 atomic_dec(&d->opd_sync_rpcs_in_progress);
947         }
948
949         CDEBUG(D_OTHER, "%s: %d in flight, %d in progress\n",
950                d->opd_obd->obd_name, atomic_read(&d->opd_sync_rpcs_in_flight),
951                atomic_read(&d->opd_sync_rpcs_in_progress));
952
953         /* Delete the invalid record */
954         if (rc == 1) {
955                 rc = llog_cat_cancel_records(env, cathandle, 1, &cookie);
956                 if (rc != 0)
957                         CERROR("%s: can't delete invalid record: "
958                                "fid = "DFID", rec_id = %u, rc = %d\n",
959                                d->opd_obd->obd_name,
960                                PFID(lu_object_fid(&cathandle->lgh_obj->do_lu)),
961                                rec->lrh_id, rc);
962         }
963
964         CDEBUG(D_OTHER, "found record %x, %d, idx %u, id %u\n",
965                rec->lrh_type, rec->lrh_len, rec->lrh_index, rec->lrh_id);
966
967         RETURN_EXIT;
968 }
969
970 /**
971  * Cancel llog records for the committed changes.
972  *
973  * The function walks through the list of the committed RPCs and cancels
974  * corresponding llog records. see osp_sync_request_commit_cb() for the
975  * details.
976  *
977  * \param[in] env       LU environment provided by the caller
978  * \param[in] d         OSP device
979  */
980 static void osp_sync_process_committed(const struct lu_env *env,
981                                        struct osp_device *d)
982 {
983         struct obd_device       *obd = d->opd_obd;
984         struct obd_import       *imp = obd->u.cli.cl_import;
985         struct ost_body         *body;
986         struct ptlrpc_request   *req;
987         struct llog_ctxt        *ctxt;
988         struct llog_handle      *llh;
989         int                     *arr, arr_size;
990         LIST_HEAD(list);
991         struct list_head         *le;
992         struct llog_logid        lgid;
993         int                      rc, i, count = 0, done = 0;
994
995         ENTRY;
996
997         if (list_empty(&d->opd_sync_committed_there))
998                 return;
999
1000         /*
1001          * if current status is -ENOSPC (lack of free space on OST)
1002          * then we should poll OST immediately once object destroy
1003          * is committed.
1004          * notice: we do this upon commit as well because some backends
1005          * (like DMU) do not release space right away.
1006          */
1007         if (d->opd_pre != NULL && unlikely(d->opd_pre_status == -ENOSPC))
1008                 osp_statfs_need_now(d);
1009
1010         /*
1011          * now cancel them all
1012          * XXX: can we improve this using some batching?
1013          *      with batch RPC that'll happen automatically?
1014          * XXX: can we store ctxt in lod_device and save few cycles ?
1015          */
1016         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1017         LASSERT(ctxt);
1018
1019         llh = ctxt->loc_handle;
1020         LASSERT(llh);
1021
1022         spin_lock(&d->opd_sync_lock);
1023         list_splice(&d->opd_sync_committed_there, &list);
1024         INIT_LIST_HEAD(&d->opd_sync_committed_there);
1025         spin_unlock(&d->opd_sync_lock);
1026
1027         list_for_each(le, &list)
1028                 count++;
1029         if (count > 2) {
1030                 arr_size = sizeof(int) * count;
1031                 /* limit cookie array to order 2 */
1032                 arr_size = arr_size < (PAGE_SIZE * 4) ? arr_size :
1033                         (PAGE_SIZE * 4);
1034                 OBD_ALLOC_LARGE(arr, arr_size);
1035         } else {
1036                 arr = NULL;
1037                 arr_size = 0;
1038         }
1039         i = 0;
1040         while (!list_empty(&list)) {
1041                 struct osp_job_req_args *jra;
1042
1043                 jra = list_first_entry(&list, struct osp_job_req_args,
1044                                        jra_committed_link);
1045                 LASSERT(jra->jra_magic == OSP_JOB_MAGIC);
1046                 list_del_init(&jra->jra_committed_link);
1047
1048                 req = container_of((void *)jra, struct ptlrpc_request,
1049                                    rq_async_args);
1050                 body = req_capsule_client_get(&req->rq_pill,
1051                                               &RMF_OST_BODY);
1052                 LASSERT(body);
1053                 /* import can be closing, thus all commit cb's are
1054                  * called we can check committness directly */
1055                 if (req->rq_import_generation == imp->imp_generation) {
1056                         if (arr && (!i ||
1057                                     !memcmp(&jra->jra_lcookie.lgc_lgl, &lgid,
1058                                            sizeof(lgid)))) {
1059                                 if (unlikely(!i))
1060                                         lgid = jra->jra_lcookie.lgc_lgl;
1061
1062                                 arr[i++] = jra->jra_lcookie.lgc_index;
1063                         } else {
1064                                 rc = llog_cat_cancel_records(env, llh, 1,
1065                                                              &jra->jra_lcookie);
1066                                 if (rc)
1067                                         CERROR("%s: can't cancel record: rc = %d\n",
1068                                                obd->obd_name, rc);
1069                         }
1070                 } else {
1071                         DEBUG_REQ(D_OTHER, req, "imp_committed = %llu",
1072                                   imp->imp_peer_committed_transno);
1073                 }
1074                 ptlrpc_req_put(req);
1075                 done++;
1076                 if (arr &&
1077                     ((i * sizeof(int)) == arr_size ||
1078                      (list_empty(&list) && i > 0))) {
1079                         rc = llog_cat_cancel_arr_rec(env, llh, &lgid, i, arr);
1080
1081                         if (rc)
1082                                 CERROR("%s: can't cancel %d records: rc = %d\n",
1083                                        obd->obd_name, i, rc);
1084                         else
1085                                 CDEBUG(D_OTHER, "%s: massive records cancel id "DFID" num %d\n",
1086                                        obd->obd_name, PLOGID(&lgid), i);
1087                         i = 0;
1088                 }
1089
1090         }
1091
1092         if (arr)
1093                 OBD_FREE_LARGE(arr, arr_size);
1094
1095         llog_ctxt_put(ctxt);
1096
1097         LASSERT(atomic_read(&d->opd_sync_rpcs_in_progress) >= done);
1098         atomic_sub(done, &d->opd_sync_rpcs_in_progress);
1099         CDEBUG((done > 2 ? D_HA : D_OTHER), "%s: %u changes, %u in progress,"
1100                                      " %u in flight, %u done\n",
1101                                      d->opd_obd->obd_name,
1102                                      atomic_read(&d->opd_sync_changes),
1103                                      atomic_read(&d->opd_sync_rpcs_in_progress),
1104                                      atomic_read(&d->opd_sync_rpcs_in_flight),
1105                                      done);
1106
1107         osp_sync_check_for_work(d);
1108
1109         /* wake up the thread if requested to stop:
1110          * it might be waiting for in-progress to complete
1111          */
1112         if (atomic_read(&d->opd_sync_rpcs_in_progress) == 0)
1113                 wake_up(&d->opd_sync_waitq);
1114
1115         EXIT;
1116 }
1117
1118 /**
1119  * The core of the syncing mechanism.
1120  *
1121  * This is a callback called by the llog processing function. Essentially it
1122  * suspends llog processing until there is a record to process (it's supposed
1123  * to be committed locally). The function handles RPCs committed by the target
1124  * and cancels corresponding llog records.
1125  *
1126  * \param[in] env       LU environment provided by the caller
1127  * \param[in] llh       llog handle we're processing
1128  * \param[in] rec       current llog record
1129  * \param[in] data      callback data containing a pointer to the device
1130  *
1131  * \retval 0                    to ask the caller (llog_process()) to continue
1132  * \retval LLOG_PROC_BREAK      to ask the caller to break
1133  */
1134 static int osp_sync_process_queues(const struct lu_env *env,
1135                                    struct llog_handle *llh,
1136                                    struct llog_rec_hdr *rec,
1137                                    void *data)
1138 {
1139         struct osp_device       *d = data;
1140
1141         do {
1142                 if (!d->opd_sync_task) {
1143                         CDEBUG(D_HA, "stop llog processing\n");
1144                         return LLOG_PROC_BREAK;
1145                 }
1146
1147                 /* process requests committed by OST */
1148                 osp_sync_process_committed(env, d);
1149
1150                 /* if we there are changes to be processed and we have
1151                  * resources for this ... do now */
1152                 if (osp_sync_can_process_new(d, rec)) {
1153                         if (llh == NULL) {
1154                                 /* ask llog for another record */
1155                                 return 0;
1156                         }
1157                         osp_sync_process_record(env, d, llh, rec);
1158                         llh = NULL;
1159                         rec = NULL;
1160                 }
1161                 if (CFS_FAIL_PRECHECK(OBD_FAIL_CATALOG_FULL_CHECK) &&
1162                             cfs_fail_val != 1)
1163                         msleep(1 * MSEC_PER_SEC);
1164
1165                 wait_event_idle(d->opd_sync_waitq,
1166                                 !d->opd_sync_task ||
1167                                 osp_sync_can_process_new(d, rec) ||
1168                                 !list_empty(&d->opd_sync_committed_there));
1169         } while (1);
1170 }
1171
1172 struct osp_sync_args {
1173         struct osp_device       *osa_dev;
1174         struct lu_env            osa_env;
1175         struct completion       *osa_started;
1176 };
1177
1178 /**
1179  * OSP sync thread.
1180  *
1181  * This thread runs llog_cat_process() scanner calling our callback
1182  * to process llog records. in the callback we implement tricky
1183  * state machine as we don't want to start scanning of the llog again
1184  * and again, also we don't want to process too many records and send
1185  * too many RPCs a time. so, depending on current load (num of changes
1186  * being synced to OST) the callback can suspend awaiting for some
1187  * new conditions, like syncs completed.
1188  *
1189  * In order to process llog records left by previous boots and to allow
1190  * llog_process_thread() to find something (otherwise it'd just exit
1191  * immediately) we add a special GENERATATION record on each boot.
1192  *
1193  * \param[in] _arg      a pointer to thread's arguments
1194  *
1195  * \retval 0            on success
1196  * \retval negative     negated errno on error
1197  */
1198 static int osp_sync_thread(void *_args)
1199 {
1200         struct osp_sync_args    *args = _args;
1201         struct osp_device       *d = args->osa_dev;
1202         struct llog_ctxt        *ctxt;
1203         struct obd_device       *obd = d->opd_obd;
1204         struct llog_handle      *llh;
1205         struct lu_env           *env = &args->osa_env;
1206         int                      rc, count;
1207         bool                     wrapped;
1208
1209         ENTRY;
1210
1211         complete(args->osa_started);
1212 again:
1213         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1214         if (ctxt == NULL) {
1215                 CERROR("can't get appropriate context\n");
1216                 GOTO(out, rc = -EINVAL);
1217         }
1218
1219         llh = ctxt->loc_handle;
1220         if (llh == NULL) {
1221                 CERROR("can't get llh\n");
1222                 llog_ctxt_put(ctxt);
1223                 GOTO(out, rc = -EINVAL);
1224         }
1225
1226         /*
1227          * Catalog processing stops when it processed last catalog record
1228          * with index equal to the end of catalog bitmap. Or if it is wrapped,
1229          * processing stops with index equal to the lgh_last_idx. We need to
1230          * continue processing.
1231          */
1232         d->opd_sync_last_catalog_idx = 0;
1233         do {
1234                 int     size;
1235
1236                 wrapped = llog_cat_is_wrapped(llh);
1237                 if (CFS_FAIL_CHECK(OBD_FAIL_OSP_CANT_PROCESS_LLOG)) {
1238                         rc = -EINPROGRESS;
1239                         goto next;
1240                 }
1241                 rc = llog_cat_process(env, llh, osp_sync_process_queues, d,
1242                                       d->opd_sync_last_catalog_idx, 0);
1243
1244 next:
1245                 size = llog_max_idx(llh->lgh_hdr);
1246
1247                 /* processing reaches catalog bottom */
1248                 if (d->opd_sync_last_catalog_idx == size)
1249                         d->opd_sync_last_catalog_idx = 1;
1250                 else if (wrapped)
1251                         d->opd_sync_last_catalog_idx++;
1252                 /* If catalog is wrapped we can`t predict last index of
1253                  * processing because lgh_last_idx could be changed.
1254                  * Starting form the next one. Index would be increased
1255                  * at llog_process_thread
1256                  */
1257         } while (rc == 0 && (wrapped ||
1258                              d->opd_sync_last_catalog_idx == 1));
1259
1260         if (rc < 0) {
1261                 if (rc == -EINPROGRESS) {
1262                         /* can't access the llog now - OI scrub is trying to fix
1263                          * underlying issue. let's wait and try again */
1264                         llog_cat_close(env, llh);
1265                         rc = llog_cleanup(env, ctxt);
1266                         if (rc)
1267                                 GOTO(out, rc);
1268                         schedule_timeout_interruptible(cfs_time_seconds(5));
1269                         goto again;
1270                 }
1271
1272                 CERROR("%s: llog process with osp_sync_process_queues "
1273                        "failed: %d\n", d->opd_obd->obd_name, rc);
1274                 GOTO(wait, rc);
1275         }
1276         LASSERTF(rc == 0 || rc == LLOG_PROC_BREAK,
1277                  "%u changes, %u in progress, %u in flight: %d\n",
1278                  atomic_read(&d->opd_sync_changes),
1279                  atomic_read(&d->opd_sync_rpcs_in_progress),
1280                  atomic_read(&d->opd_sync_rpcs_in_flight), rc);
1281
1282         /* we don't expect llog_process_thread() to exit till umount */
1283         LASSERTF(kthread_should_stop(),
1284                  "%u changes, %u in progress, %u in flight\n",
1285                  atomic_read(&d->opd_sync_changes),
1286                  atomic_read(&d->opd_sync_rpcs_in_progress),
1287                  atomic_read(&d->opd_sync_rpcs_in_flight));
1288
1289 wait:
1290         /* wait till all the requests are completed */
1291         count = 0;
1292         while (atomic_read(&d->opd_sync_rpcs_in_progress) > 0) {
1293                 osp_sync_process_committed(env, d);
1294
1295                 rc = wait_event_idle_timeout(
1296                         d->opd_sync_waitq,
1297                         atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
1298                         cfs_time_seconds(5));
1299                 if (rc == 0)
1300                         count++;
1301                 LASSERTF(count < 10, "%s: %d %d %sempty\n",
1302                          d->opd_obd->obd_name,
1303                          atomic_read(&d->opd_sync_rpcs_in_progress),
1304                          atomic_read(&d->opd_sync_rpcs_in_flight),
1305                          list_empty(&d->opd_sync_committed_there) ? "" : "!");
1306
1307         }
1308
1309         llog_cat_close(env, llh);
1310         rc = llog_cleanup(env, ctxt);
1311         if (rc)
1312                 CERROR("can't cleanup llog: %d\n", rc);
1313 out:
1314         LASSERTF(atomic_read(&d->opd_sync_rpcs_in_progress) == 0,
1315                  "%s: %d %d %sempty\n", d->opd_obd->obd_name,
1316                  atomic_read(&d->opd_sync_rpcs_in_progress),
1317                  atomic_read(&d->opd_sync_rpcs_in_flight),
1318                  list_empty(&d->opd_sync_committed_there) ? "" : "!");
1319
1320         lu_env_fini(env);
1321
1322         if (xchg(&d->opd_sync_task, NULL) == NULL)
1323                 /* already being waited for */
1324                 wait_event_interruptible(d->opd_sync_waitq,
1325                                          kthread_should_stop());
1326         OBD_FREE_PTR(args);
1327
1328         RETURN(0);
1329 }
1330
1331 /**
1332  * Initialize llog.
1333  *
1334  * Initializes the llog. Specific llog to be used depends on the type of the
1335  * target OSP represents (OST or MDT). The function adds appends a new llog
1336  * record to mark the place where the records associated with this boot
1337  * start.
1338  *
1339  * \param[in] env       LU environment provided by the caller
1340  * \param[in] d         OSP device
1341  *
1342  * \retval 0            on success
1343  * \retval negative     negated errno on error
1344  */
1345 static int osp_sync_llog_init(const struct lu_env *env, struct osp_device *d)
1346 {
1347         struct osp_thread_info  *osi = osp_env_info(env);
1348         struct lu_fid           *fid = &osi->osi_fid;
1349         struct llog_handle      *lgh = NULL;
1350         struct obd_device       *obd = d->opd_obd;
1351         struct llog_ctxt        *ctxt;
1352         int                     rc;
1353
1354         ENTRY;
1355
1356         LASSERT(obd);
1357
1358         /*
1359          * open llog corresponding to our OST
1360          */
1361         OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
1362         obd->obd_lvfs_ctxt.dt = d->opd_storage;
1363
1364         lu_local_obj_fid(fid, LLOG_CATALOGS_OID);
1365
1366         rc = llog_osd_get_cat_list(env, d->opd_storage, d->opd_index, 1,
1367                                    &osi->osi_cid, fid);
1368         if (rc < 0) {
1369                 if (rc != -EFAULT) {
1370                         CERROR("%s: can't get id from catalogs: rc = %d\n",
1371                                obd->obd_name, rc);
1372                         RETURN(rc);
1373                 }
1374
1375                 /* After sparse OST indices is supported, the CATALOG file
1376                  * may become a sparse file that results in failure on
1377                  * reading. Skip this error as the llog will be created
1378                  * later */
1379                 memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1380                 rc = 0;
1381         }
1382
1383         CDEBUG(D_INFO, "%s: init llog for %d - catid "DFID"\n",
1384                obd->obd_name, d->opd_index, PLOGID(&osi->osi_cid.lci_logid));
1385
1386         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_MDS_OST_ORIG_CTXT,
1387                         d->opd_storage->dd_lu_dev.ld_obd,
1388                         &llog_common_cat_ops);
1389         if (rc)
1390                 RETURN(rc);
1391
1392         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
1393         LASSERT(ctxt);
1394
1395         if (likely(logid_id(&osi->osi_cid.lci_logid) != 0)) {
1396                 struct lu_fid fid_temp;
1397
1398                 if (CFS_FAIL_CHECK(OBD_FAIL_OSP_INVALID_LOGID)) {
1399                         memset(&osi->osi_cid, 0, sizeof(osi->osi_cid));
1400                         logid_set_id(&osi->osi_cid.lci_logid, cfs_fail_val);
1401                 }
1402
1403                 logid_to_fid(&osi->osi_cid.lci_logid, &fid_temp);
1404                 if (fid_is_sane(&fid_temp)) {
1405                         rc = llog_open(env, ctxt, &lgh, &osi->osi_cid.lci_logid,
1406                                        NULL, LLOG_OPEN_EXISTS);
1407
1408                         /* re-create llog if it is missing */
1409                         if (rc == -ENOENT)
1410                                 logid_set_id(&osi->osi_cid.lci_logid, 0);
1411                         else if (rc < 0)
1412                                 GOTO(out_cleanup, rc);
1413                 } else {
1414                         CERROR("%s: the catid "DFID" for init llog %d is bad\n",
1415                                obd->obd_name, PFID(&fid_temp), d->opd_index);
1416
1417                         /* it will be recreated later */
1418                         logid_set_id(&osi->osi_cid.lci_logid, 0);
1419                 }
1420         }
1421
1422         if (unlikely(logid_id(&osi->osi_cid.lci_logid) == 0)) {
1423                 rc = llog_open_create(env, ctxt, &lgh, NULL, NULL);
1424                 if (rc < 0)
1425                         GOTO(out_cleanup, rc);
1426                 osi->osi_cid.lci_logid = lgh->lgh_id;
1427         }
1428
1429         LASSERT(lgh != NULL);
1430         ctxt->loc_handle = lgh;
1431
1432         rc = llog_init_handle(env, lgh, LLOG_F_IS_CAT | LLOG_F_RM_ON_ERR, NULL);
1433         if (rc)
1434                 GOTO(out_close, rc);
1435
1436         rc = llog_osd_put_cat_list(env, d->opd_storage, d->opd_index, 1,
1437                                    &osi->osi_cid, fid);
1438         if (rc)
1439                 GOTO(out_close, rc);
1440
1441         /*
1442          * put a mark in the llog till which we'll be processing
1443          * old records restless
1444          */
1445         d->opd_sync_generation.mnt_cnt = ktime_get_ns();
1446         d->opd_sync_generation.conn_cnt = ktime_get_ns();
1447
1448         osi->osi_hdr.lrh_type = LLOG_GEN_REC;
1449         osi->osi_hdr.lrh_len = sizeof(osi->osi_gen);
1450
1451         memcpy(&osi->osi_gen.lgr_gen, &d->opd_sync_generation,
1452                sizeof(osi->osi_gen.lgr_gen));
1453
1454         rc = llog_cat_add(env, lgh, &osi->osi_gen.lgr_hdr, &osi->osi_cookie);
1455         if (rc < 0)
1456                 GOTO(out_close, rc);
1457         llog_ctxt_put(ctxt);
1458         RETURN(0);
1459 out_close:
1460         llog_cat_close(env, lgh);
1461 out_cleanup:
1462         llog_cleanup(env, ctxt);
1463         RETURN(rc);
1464 }
1465
1466 /**
1467  * Cleanup llog used for syncing.
1468  *
1469  * Closes and cleanups the llog. The function is called when the device is
1470  * shutting down.
1471  *
1472  * \param[in] env       LU environment provided by the caller
1473  * \param[in] d         OSP device
1474  */
1475 static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
1476 {
1477         struct llog_ctxt *ctxt;
1478
1479         ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
1480         if (ctxt) {
1481                 llog_cat_close(env, ctxt->loc_handle);
1482                 llog_cleanup(env, ctxt);
1483         }
1484 }
1485
1486 /**
1487  * Initialization of the sync component of OSP.
1488  *
1489  * Initializes the llog and starts a new thread to handle the changes to
1490  * the remote target (OST or MDT).
1491  *
1492  * \param[in] env       LU environment provided by the caller
1493  * \param[in] d         OSP device
1494  *
1495  * \retval 0            on success
1496  * \retval negative     negated errno on error
1497  */
1498 int osp_sync_init(const struct lu_env *env, struct osp_device *d)
1499 {
1500         struct task_struct      *task;
1501         struct osp_sync_args    *args;
1502         DECLARE_COMPLETION_ONSTACK(started);
1503         int                      rc;
1504
1505         ENTRY;
1506
1507         d->opd_sync_max_rpcs_in_flight = OSP_MAX_RPCS_IN_FLIGHT;
1508         d->opd_sync_max_rpcs_in_progress = OSP_MAX_RPCS_IN_PROGRESS;
1509         d->opd_sync_max_changes = OSP_MAX_SYNC_CHANGES;
1510         spin_lock_init(&d->opd_sync_lock);
1511         init_waitqueue_head(&d->opd_sync_waitq);
1512         init_waitqueue_head(&d->opd_sync_barrier_waitq);
1513         INIT_LIST_HEAD(&d->opd_sync_in_flight_list);
1514         INIT_LIST_HEAD(&d->opd_sync_committed_there);
1515
1516         if (d->opd_storage->dd_rdonly)
1517                 RETURN(0);
1518
1519         OBD_ALLOC_PTR(args);
1520         if (!args)
1521                 GOTO(err_id, rc = -ENOMEM);
1522         args->osa_dev = d;
1523         args->osa_started = &started;
1524         /*
1525          * initialize llog storing changes
1526          */
1527         rc = osp_sync_llog_init(env, d);
1528         if (rc) {
1529                 CERROR("%s: can't initialize llog: rc = %d\n",
1530                        d->opd_obd->obd_name, rc);
1531                 GOTO(err_id, rc);
1532         }
1533
1534         rc = lu_env_init(&args->osa_env, LCT_LOCAL);
1535         if (rc) {
1536                 CERROR("%s: can't initialize env: rc = %d\n",
1537                        d->opd_obd->obd_name, rc);
1538                 GOTO(err_llog, rc);
1539         }
1540
1541         /*
1542          * Start synchronization thread
1543          */
1544         task = kthread_create(osp_sync_thread, args, "osp-syn-%u-%u",
1545                            d->opd_index, d->opd_group);
1546         if (IS_ERR(task)) {
1547                 rc = PTR_ERR(task);
1548                 CERROR("%s: cannot start sync thread: rc = %d\n",
1549                        d->opd_obd->obd_name, rc);
1550                 lu_env_fini(&args->osa_env);
1551                 GOTO(err_llog, rc);
1552         }
1553         d->opd_sync_task = task;
1554         wake_up_process(task);
1555         wait_for_completion(&started);
1556
1557         RETURN(0);
1558 err_llog:
1559         osp_sync_llog_fini(env, d);
1560 err_id:
1561         if (args)
1562                 OBD_FREE_PTR(args);
1563         return rc;
1564 }
1565
1566 /**
1567  * Stop the syncing thread.
1568  *
1569  * Asks the syncing thread to stop and wait until it's stopped.
1570  *
1571  * \param[in] d         OSP device
1572  *
1573  * \retval              0
1574  */
1575 int osp_sync_fini(struct osp_device *d)
1576 {
1577         struct task_struct *task;
1578
1579         ENTRY;
1580
1581         task = xchg(&d->opd_sync_task, NULL);
1582         if (task)
1583                 kthread_stop(task);
1584
1585         RETURN(0);
1586 }
1587
1588 struct osp_last_committed_cb {
1589         struct dt_txn_commit_cb  ospc_cb;
1590         struct osp_device       *ospc_dev;
1591         __u64                    ospc_transno;
1592 };
1593
1594 static void osp_sync_local_commit_cb(struct lu_env *env, struct thandle *th,
1595                                      struct dt_txn_commit_cb *dcb, int err)
1596 {
1597         struct osp_last_committed_cb    *cb;
1598         struct osp_device               *d;
1599
1600         cb = container_of(dcb, struct osp_last_committed_cb, ospc_cb);
1601         d = cb->ospc_dev;
1602
1603         CDEBUG(D_HA, "%s: %llu committed\n", d->opd_obd->obd_name,
1604                cb->ospc_transno);
1605
1606         spin_lock(&d->opd_sync_lock);
1607         if (cb->ospc_transno > d->opd_sync_last_committed_id)
1608                 d->opd_sync_last_committed_id = cb->ospc_transno;
1609         spin_unlock(&d->opd_sync_lock);
1610
1611         osp_sync_check_for_work(d);
1612         lu_device_put(osp2lu_dev(d));
1613         if (atomic_dec_and_test(&d->opd_commits_registered))
1614                 wake_up(&d->opd_sync_waitq);
1615
1616         OBD_FREE_PTR(cb);
1617 }
1618
1619 static int osp_sync_add_commit_cb(const struct lu_env *env,
1620                                   struct osp_device *d, struct thandle *th)
1621 {
1622         struct osp_last_committed_cb    *cb;
1623         struct dt_txn_commit_cb         *dcb;
1624         int                              rc = 0;
1625
1626         OBD_ALLOC_PTR(cb);
1627         if (cb == NULL)
1628                 return -ENOMEM;
1629         cb->ospc_dev = d;
1630         dcb = &cb->ospc_cb;
1631         dcb->dcb_func = osp_sync_local_commit_cb;
1632         spin_lock(&d->opd_sync_lock);
1633         cb->ospc_transno = ++d->opd_sync_last_used_id;
1634         spin_unlock(&d->opd_sync_lock);
1635
1636         rc = dt_trans_cb_add(th, dcb);
1637         CDEBUG(D_HA, "%s: add commit cb at %lluns, next at %lluns, rc = %d\n",
1638                d->opd_obd->obd_name, ktime_get_ns(),
1639                ktime_to_ns(d->opd_sync_next_commit_cb), rc);
1640
1641         if (likely(rc == 0)) {
1642                 lu_device_get(osp2lu_dev(d));
1643                 atomic_inc(&d->opd_commits_registered);
1644         } else
1645                 OBD_FREE_PTR(cb);
1646
1647         return rc;
1648 }
1649
1650 /* add the commit callback every second */
1651 int osp_sync_add_commit_cb_1s(const struct lu_env *env, struct osp_device *d,
1652                               struct thandle *th)
1653 {
1654         ktime_t now = ktime_get();
1655         bool add = false;
1656
1657         /* fast path */
1658         if (ktime_before(now, d->opd_sync_next_commit_cb))
1659                 return 0;
1660
1661         spin_lock(&d->opd_sync_lock);
1662         if (ktime_before(d->opd_sync_next_commit_cb, now)) {
1663                 add = true;
1664                 d->opd_sync_next_commit_cb = ktime_add_ns(now, NSEC_PER_SEC);
1665         }
1666         spin_unlock(&d->opd_sync_lock);
1667
1668         if (!add)
1669                 return 0;
1670
1671         return osp_sync_add_commit_cb(env, d, th);
1672 }
1673
1674 /*
1675  * generate an empty transaction and hook the commit callback in
1676  * then force transaction commit
1677  */
1678 void osp_sync_force(const struct lu_env *env, struct osp_device *d)
1679 {
1680         struct thandle *th;
1681         int rc;
1682
1683         th = dt_trans_create(env, d->opd_storage);
1684         if (IS_ERR(th)) {
1685                 CERROR("%s: can't sync\n", d->opd_obd->obd_name);
1686                 return;
1687         }
1688         rc = dt_trans_start_local(env, d->opd_storage, th);
1689         if (rc == 0) {
1690                 CDEBUG(D_OTHER, "%s: sync forced, %d changes\n",
1691                        d->opd_obd->obd_name,
1692                        atomic_read(&d->opd_sync_changes));
1693                 rc = osp_sync_add_commit_cb(env, d, th);
1694                 dt_trans_stop(env, d->opd_storage, th);
1695         }
1696
1697         dt_commit_async(env, d->opd_storage);
1698 }