/***************************************************************************** Copyright (c) 1995, 2019, Oracle and/or its affiliates. All Rights Reserved. Copyright (c) 2009, Google Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA Portions of this file contain modifications contributed and copyrighted by Google, Inc. Those modifications are gratefully acknowledged and are described briefly in the InnoDB documentation. The contributions by Google are incorporated with their permission, and subject to the conditions contained in the file COPYING.Google. *****************************************************************************/ /**************************************************/ /** @file log/log0write.cc Redo log writing and flushing, including functions for: 1. Waiting for the log written / flushed up to provided lsn. 2. Redo log background threads (except the log checkpointer). @author Paweł Olchawa *******************************************************/ #ifndef UNIV_HOTBACKUP #include #include "ha_prototypes.h" #include #include "arch0arch.h" #include "buf0buf.h" #include "buf0flu.h" #include "dict0boot.h" #include "dict0stats_bg.h" #endif /* !UNIV_HOTBACKUP */ #include "fil0fil.h" #include "log0log.h" #include "log0meb.h" #ifndef UNIV_HOTBACKUP #include "log0recv.h" #include "mem0mem.h" #include "mysqld.h" /* server_uuid */ #include "srv0mon.h" #include "srv0srv.h" #include "srv0start.h" #include "sync0sync.h" #include "trx0roll.h" #include "trx0sys.h" #include "trx0trx.h" /**************************************************/ /** @page PAGE_INNODB_REDO_LOG_THREADS Background redo log threads Three background log threads are responsible for writes of new data to disk: -# [Log writer](@ref sect_redo_log_writer) - writes from the log buffer or write-ahead buffer to OS buffers. -# [Log flusher](@ref sect_redo_log_flusher) - writes from OS buffers to disk (fsyncs). -# [Log write_notifier](@ref sect_redo_log_write_notifier) - notifies user threads about completed writes to disk (when write_lsn is advanced). -# [Log flush_notifier](@ref sect_redo_log_flush_notifier) - notifies user threads about completed fsyncs (when flushed_to_disk_lsn is advanced). Two background log threads are responsible for checkpoints (reclaiming space in log files): -# [Log closer](@ref sect_redo_log_closer) - tracks up to which lsn all dirty pages have been added to flush lists (wrt. oldest_modification). -# [Log checkpointer](@ref sect_redo_log_checkpointer) - determines @ref subsect_redo_log_available_for_checkpoint_lsn and writes checkpoints. @section sect_redo_log_writer Thread: log writer This thread is responsible for writing data from the log buffer to disk (to the log files). However, it's not responsible for doing fsync() calls. It copies data to system buffers. It is the log flusher thread, which is responsible for doing fsync(). There are following points that need to be addressed by the log writer thread: -# %Find out how much data is ready in the log buffer, which is concurrently filled in by multiple user threads. In the log recent written buffer, user threads set links for every finished write to the log buffer. Each such link is represented as a number of bytes written, starting from a _start_lsn_. The link is stored in the slot assigned to the _start_lsn_ of the write. The log writer thread tracks links in the recent written buffer, traversing a connected path created by the links. It stops when it encounters a missing outgoing link. In such case the next fragment of the log buffer is still being written (or the maximum assigned lsn was reached). It also stops as soon as it has traversed by more than 4kB, in which case it is enough for a next write (unless we decided again to do fsyncs from inside the log writer thread). After traversing links and clearing slots occupied by the links (in the recent written buffer), the log writer thread updates @ref subsect_redo_log_buf_ready_for_write_lsn. @diafile storage/innobase/log/recent_written_buffer.dia "Example of links in the recent written buffer" @note The log buffer has no holes up to the _log.buf_ready_for_write_lsn_ (all concurrent writes for smaller lsn have been finished). If there were no links to traverse, _log.buf_ready_for_write_lsn_ was not advanced and the log writer thread needs to wait. In such case it first uses spin delay and afterwards switches to wait on the _writer_event_. -# Prepare log blocks for writing - update their headers and footers. The log writer thread detects completed log blocks in the log buffer. Such log blocks will not receive any more writes. Hence their headers and footers could be easily updated (e.g. checksum is calculated). @diafile storage/innobase/log/log_writer_complete_blocks.dia "Complete blocks are detected and written" If any complete blocks were detected, they are written directly from the log buffer (after updating headers and footers). Afterwards the log writer thread retries the previous step before making next decisions. For each write consisting of one or more complete blocks, the _MONITOR_LOG_FULL_BLOCK_WRITES_ is incremented by one. @note There is a special case - when write-ahead is required, data needs to be copied to the write-ahead buffer and the last incomplete block could also be copied and written. For details read below and check the next point. The special case is also for the last, incomplete log block. Note that @ref subsect_redo_log_buf_ready_for_write_lsn could be in the middle of such block. In such case, next writes are likely incoming to the log block. @diafile storage/innobase/log/log_writer_incomplete_block.dia "Incomplete block is copied" For performance reasons we often need to write the last incomplete block. That's because it turned out, that we should try to reclaim user threads as soon as possible, allowing them to handle next transactions and provide next data. In such case: - the last log block is first copied to the dedicated buffer, up to the @ref subsect_redo_log_buf_ready_for_write_lsn, - the remaining part of the block in the dedicated buffer is filled in with _0x00_ bytes, - header fields are updated, - checksum is calculated and stored in the block's footer, - the block is written from the dedicated buffer, - the _MONITOR_LOG_PARTIAL_BLOCK_WRITES_ is incremented by one. @note The write-ahead buffer is used as the dedicated buffer for writes of the last incomplete block. That's because, whenever we needed a next write-ahead (even for complete blocks), we possibly can also write the last incomplete block during the write-ahead. The monitor counters for full/partial block writes are incremented before the logic related to writing ahead is applied. Hence the counter of partial block writes is not incremented if a full block write was possible (in which case only requirement for write-ahead could be the reason of writing the incomplete block). @remarks The log writer thread never updates [first_rec_group](@ref a_redo_log_block_first_rec_group) field. It has to be set by user threads before the block is allowed to be written. That's because only user threads know where are the boundaries between groups of log records. The user thread which has written data ending at lsn which needs to be pointed as _first_rec_group_, is the one responsible for setting the field. User thread which has written exactly up to the end of log block, is considered ending at lsn after the header of the next log block. That's because after such write, the log writer is allowed to write the next empty log block (_buf_ready_for_write_lsn_ points then to such lsn). The _first_rec_group_ field is updated before the link is added to the log recent written buffer. -# Avoid read-on-write issue. The log writer thread is also responsible for writing ahead to avoid the read-on-write problem. It tracks up to which point the write ahead has been done. When a write would go further: - If we were trying to write more than size of single write-ahead region, we limit the write to completed write-ahead sized regions, and postpone writing the last fragment for later (retrying with the first step and updating the _buf_ready_for_write_lsn_). @note If we needed to write complete regions of write-ahead bytes, they are ready in the log buffer and could be written directly from there. Such writes would not cause read-on-write problem, because size of the writes is divisible by write-ahead region. - Else, we copy data to special write-ahead buffer, from which we could safely write the whole single write-ahead sized region. After copying the data, the write-ahead buffer is completed with _0x00_ bytes. @note The write-ahead buffer is also used for copying the last incomplete log block, which was described in the previous point. -# Update write_lsn. After doing single write (single fil_io()), the log writer thread updates @ref subsect_redo_log_write_lsn and fallbacks to its main loop. That's because a lot more data could be prepared in meantime, as the write operation could take significant time. That's why the general rule is that after doing fil_io(), we need to update @ref subsect_redo_log_buf_ready_for_write_lsn before making next decisions on how much to write within next fil_io() call. -# Notify [log writer_notifier thread](@ref sect_redo_log_write_notifier) using os_event_set on the _write_notifier_event_. @see @ref sect_redo_log_waiting_for_writer -# Notify [log flusher thread](@ref sect_redo_log_flusher) using os_event_set() on the _flusher_event_. @section sect_redo_log_flusher Thread: log flusher The log flusher thread is responsible for doing fsync() of the log files. When the fsync() calls are finished, the log flusher thread updates the @ref subsect_redo_log_flushed_to_disk_lsn and notifies the [log flush_notifier thread](@ref sect_redo_log_flush_notifier) using os_event_set() on the _flush_notifier_event_. @remarks Small optimization has been applied - if there was only a single log block flushed since the previous flush, then the log flusher thread notifies user threads directly (instead of notifying the log flush_notifier thread). Impact of the optimization turned out to be positive for some scenarios and negative for other, so further investigation is required. However, because the change seems to make sense from logical point of view, it has been preserved. If the log flusher thread detects that none of the conditions is satisfied, it simply waits and retries the checks. After initial spin delay, it waits on the _flusher_event_. @section sect_redo_log_flush_notifier Thread: log flush_notifier The log flush_notifier thread is responsible for notifying all user threads that are waiting for @ref subsect_redo_log_flushed_to_disk_lsn >= lsn, when the condition is satisfied. @remarks It also notifies when it is very likely to be satisfied (lsn values are within the same log block). It is allowed to make mistakes and it is responsibility of the notified user threads to ensure, that the _flushed_to_disk_lsn_ is advanced sufficiently. The log flush_notifier thread waits for the advanced _flushed_to_disk_lsn_ in loop, using os_event_wait_time_low() on the _flush_notifier_event_. When it gets notified by the [log flusher](@ref sect_redo_log_flusher), it ensures that the _flushed_to_disk_lsn_ has been advanced (single new byte is enough though). It notifies user threads waiting on all events between (inclusive): - event for a block with the previous value of _flushed_to_disk_lsn_, - event for a block containing the new value of _flushed_to_disk_lsn_. Events are assigned per blocks in the circular array of events using mapping: event_slot = (lsn-1) / OS_FILE_LOG_BLOCK_SIZE % S where S is size of the array (number of slots with events). Each slot has single event, which groups all user threads waiting for flush up to any lsn within the same log block (or log block with number greater by S*i). @diafile storage/innobase/log/log_notifier_notifications.dia "Notifications executed on slots" Internal mutex in event is used, to avoid missed notifications (these would be worse than the false notifications). However, there is also maximum timeout defined for the waiting on the event. After the timeout was reached (default: 1ms), the _flushed_to_disk_lsn_ is re-checked in the user thread (just in case). @note Because flushes are possible for @ref subsect_redo_log_write_lsn set in the middle of log block, it is likely that the same slot for the same block will be notified multiple times in a row. We tried delaying notifications for the last block, but the results were only worse then. It turned out that latency is extremely important here. @see @ref sect_redo_log_waiting_for_flusher @section sect_redo_log_write_notifier Thread: log write_notifier The log write_notifier thread is responsible for notifying all user threads that are waiting for @ref subsect_redo_log_write_lsn >= lsn, when the condition is satisfied. @remarks It also notifies when it is very likely to be satisfied (lsn values are within the same log block). It is allowed to make mistakes and it is responsibility of the notified user threads to ensure, that the _write_lsn_ is advanced sufficiently. The log write_notifier thread waits for the advanced _write_lsn_ in loop, using os_event_wait_time_low() on the _write_notifier_event_. When it gets notified (by the [log writer](@ref sect_redo_log_writer)), it ensures that the _write_lsn_ has been advanced (single new byte is enough). Then it notifies user threads waiting on all events between (inclusive): - event for a block with the previous value of _write_lsn_, - event for a block containing the new value of _write_lsn_. Events are assigned per blocks in the circular array of events using mapping: event_slot = (lsn-1) / OS_FILE_LOG_BLOCK_SIZE % S where S is size of the array (number of slots with events). Each slot has single event, which groups all user threads waiting for write up to any lsn within the same log block (or log block with number greater by S*i). Internal mutex in event is used, to avoid missed notifications (these would be worse than the false notifications). However, there is also maximum timeout defined for the waiting on the event. After the timeout was reached (default: 1ms), the _write_lsn_ is re-checked in the user thread (just in case). @note Because writes are possible for @ref subsect_redo_log_write_lsn set in the middle of log block, it is likely that the same slot for the same block will be notified multiple times in a row. @see @ref sect_redo_log_waiting_for_writer @section sect_redo_log_closer Thread: log closer The log closer thread is responsible for tracking up to which lsn, all dirty pages have already been added to flush lists. It traverses links in the log recent closed buffer, following a connected path, which is created by the links. The traversed links are removed and afterwards the @ref subsect_redo_log_buf_dirty_pages_added_up_to_lsn is updated. Links are stored inside slots in a ring buffer. When link is removed, the related slot becomes empty. Later it is reused for link pointing from larger lsn value. The log checkpointer thread must not write a checkpoint for lsn larger than _buf_dirty_pages_added_up_to_lsn_. That is because some user thread might be in state where it is just after writing to the log buffer, but before adding its dirty pages to flush lists. The dirty pages could have modifications protected by log records, which start at lsn, which would be logically deleted by such checkpoint. @section sect_redo_log_checkpointer Thread: log checkpointer The log checkpointer thread is responsible for: -# Checking if a checkpoint write is required (to decrease checkpoint age before it gets too big). -# Checking if synchronous flush of dirty pages should be forced on page cleaner threads, because of space in redo log or age of the oldest page. -# Writing checkpoints (it's the only thread allowed to do it!). This thread has been introduced at the very end. It was not required for the performance, but it makes the design more consistent after we have introduced other log threads. That's because user threads are not doing any writes to the log files themselves then. Previously they were writing checkpoints when needed, which required synchronization between them. The log checkpointer thread updates log.available_for_checkpoint_lsn, which is calculated as: min(log.buf_dirty_pages_added_up_to_lsn, max(0, oldest_lsn - L)) where: - oldest_lsn = min(oldest modification of the earliest page from each flush list), - L is a number of slots in the log recent closed buffer. The special case is when there is no dirty page in flush lists - then it's basically set to the _log.buf_dirty_pages_added_up_to_lsn_. @note Note that previously, all user threads were trying to calculate this lsn concurrently, causing contention on flush_list mutex, which is required to read the _oldest_modification_ of the earliest added page. Now the lsn is updated in single thread. @section sect_redo_log_waiting_for_writer Waiting until log has been written to disk User has to wait until the [log writer thread](@ref sect_redo_log_writer) has written data from the log buffer to disk for lsn >= _end_lsn_ of log range used by the user, which is true when: write_lsn >= end_lsn The @ref subsect_redo_log_write_lsn is updated by the log writer thread. The waiting is solved using array of events. The user thread waiting for a given lsn, waits using the event at position: slot = (end_lsn - 1) / OS_FILE_LOG_BLOCK_SIZE % S where _S_ is number of entries in the array. Therefore the event corresponds to log block which contains the _end_lsn_. The [log write_notifier thread](@ref sect_redo_log_write_notifier) tracks how the @ref subsect_redo_log_write_lsn is advanced and notifies user threads for consecutive slots. @remarks When the _write_lsn_ is in the middle of log block, all user threads waiting for lsn values within the whole block are notified. When user thread is notified, it checks if the current value of the _write_lsn_ is sufficient and retries waiting if not. To avoid missed notifications, event's internal mutex is used. @section sect_redo_log_waiting_for_flusher Waiting until log has been flushed to disk If a user need to assure the log persistence in case of crash (e.g. on COMMIT of a transaction), he has to wait until [log flusher](@ref sect_redo_log_flusher) has flushed log files to disk for lsn >= _end_lsn_ of log range used by the user, which is true when: flushed_to_disk_lsn >= end_lsn The @ref subsect_redo_log_flushed_to_disk_lsn is updated by the log flusher thread. The waiting is solved using array of events. The user thread waiting for a given lsn, waits using the event at position: slot = (end_lsn - 1) / OS_FILE_LOG_BLOCK_SIZE % S where _S_ is number of entries in the array. Therefore the event corresponds to log block which contains the _end_lsn_. The [log flush_notifier thread](@ref sect_redo_log_flush_notifier) tracks how the @ref subsect_redo_log_flushed_to_disk_lsn is advanced and notifies user threads for consecutive slots. @remarks When the _flushed_to_disk_lsn_ is in the middle of log block, all user threads waiting for lsn values within the whole block are notified. When user thread is notified, it checks if the current value of the _flushed_to_disk_lsn_ is sufficient and retries waiting if not. To avoid missed notifications, event's internal mutex is used. @page PAGE_INNODB_REDO_LOG_FORMAT Format of redo log @section sect_redo_log_format_overview Overview Redo log contains multiple log files, each has the same format. Consecutive files have data for consecutive ranges of lsn values. When a file ends at _end_lsn_, the next log file begins at the _end_lsn_. There is a fixed number of log files, they are re-used in circular manner. That is, for the last log file, the first log file is a successor. @note A single big file would remain fully cached for some of file systems, even if only a small fragment of the file is being modified. Hence multiple log files are used to make evictions always possible. Keep in mind though that log files are used in circular manner (lsn modulo size of log files, when size is calculated except the log file headers). The default log file names are: _ib_logfile0_, _ib_logfile1_, ... The maximum allowed number of log files is 100. The special file name _ib_logfile101_ is used when new log files are created and it is used instead of _ib_logfile0_ until all the files are ready. Afterwards the _ib_logfile101_ is atomically renamed to _ib_logfile0_ and files are considered successfully created then. @section sect_redo_log_format_file Log file format @subsection subsect_redo_log_format_header Header of log file %Log file starts with a header of _LOG_FILE_HDR_SIZE_ bytes. It contains: - Initial block of _OS_FILE_LOG_BLOCK_SIZE_ (512) bytes, which has: - Binding of an offset within the file to the lsn value. This binding allows to map any lsn value which is represented within the file to corresponding lsn value. - Format of redo log - remains the same as before the patch. - Checksum of the block. - Two checkpoint blocks - _LOG_CHECKPOINT_1_ and _LOG_CHECKPOINT_2_. Each checkpoint block contains _OS_FILE_LOG_BLOCK_SIZE_ bytes: - _checkpoint_lsn_ - lsn to start recovery at. @note In earlier versions than 8.0, checkpoint_lsn pointed directly to the beginning of the first log record group, which should be recovered (but still the related page could have been flushed). However since 8.0 this value might point to some byte inside a log record. In such case, recovery is supposed to skip the group of log records which contains the checkpoint lsn (and start at the beginning of the next). We cannot easily determine beginning of the next group. There are two cases: - block with _checkpoint_lsn_ has no beginning of group at all (first_rec_group = 0) - then we search forward for the first block that has non-zero first_rec_group and there we have the next group's start, - block with _checkpoint_lsn_ has one or more groups of records starting inside the block - then we start parsing at the first group that starts in the block and keep parsing consecutive groups until we passed checkpoint_lsn; we don't apply these groups of records (we must not because of fil renames); after we passed checkpoint_lsn, the next group that starts is the one we were looking for to start recovery at; it is possible that the next group begins in the next block (if there was no more groups starting after checkpoint_lsn within the block) - _checkpoint_no_ - checkpoint number - when checkpoint is being written, a next checkpoint number is assigned. - _log.buf_size_ - size of the log buffer when the checkpoint write was started. It remains a mystery, why do we need that. It's neither used by the recovery, nor required for MEB. Some rumours say that maybe it could be useful for auto-config external tools to detect what configuration of MySQL should be used. @note Note that size of the log buffer could be decreased in runtime, after writing the checkpoint (which was not the case, when this field was being introduced). There are two checkpoint headers, because they are updated alternately. In case of crash in the middle of any such update, the alternate header would remain valid (so it's the same reason for which double write buffer is used for pages). @remarks Each log file has its own header. However checkpoints are read only from the first log file (_ib_logfile0_) during recovery. @subsection subsect_redo_log_format_blocks Log blocks After the header, there are consecutive log blocks. Each log block has the same format and consists of _OS_FILE_LOG_BLOCK_SIZE_ bytes (512). These bytes are enumerated by lsn values. @note Bytes used by [headers of log files](@ref subsect_redo_log_format_header) are NOT included in lsn sequence. Each log block contains: - header - _LOG_BLOCK_HDR_SIZE_ bytes (12): - @anchor a_redo_log_block_hdr_no hdr_no This is a block number. Consecutive blocks have consecutive numbers. Hence this is basically lsn divided by _OS_FILE_LOG_BLOCK_SIZE_. However it is also wrapped at 1G (due to limited size of the field). It should be possible to wrap it at 2G (only the single flush bit is reserved as the highest bit) but for historical reasons it is 1G. - @anchor a_redo_log_block_flush_bit flush_bit This is a single bit stored as the highest bit of hdr_no. The bit is skipped when calculating block number. It is set for the first block of multiple blocks written in a single call to fil_io(). It was supposed to help to filter out writes which were not atomic. When the flush bit is read from disk, it means that up to this lsn, all previous log records have been fully written from the log buffer to OS buffers. That's because previous calls to fil_io() had to be finished, before a fil_io() call for current block was started. The wrong assumption was that we can trust those log records then. Note, we have no guarantee that order of writes is preserved by disk controller. That's why only after fsync() call is finished, one could be sure, that data is fully written (up to the write_lsn at which fsync() was started). During recovery, when the flush bit is encountered, *contiguous_lsn is updated, but then the updated lsn seems unused... It seems that there is no real benefit from the flush bit at all, and even in 5.7 it was completely ignored during the recovery. - @anchor a_redo_log_block_data_len data_len Number of bytes within the log block. Possible values: - _0_ - this is an empty block (end the recovery). - _OS_FILE_LOG_BLOCK_SIZE_ - this is a full block. - value within [_LOG_BLOCK_HDR_SIZE_, _OS_FILE_LOG_BLOCK_SIZE_ - _LOG_BLOCK_TRL_SIZE_), which means that this is the last block and it is an incomplete block. This could be then considered an offset, which points to the end of the data within the block. This value includes _LOG_BLOCK_HDR_SIZE_ bytes of the header. - @anchor a_redo_log_block_first_rec_group first_rec_group Offset within the log block to the beginning of the first group of log records that starts within the block or 0 if none starts. This offset includes _LOG_BLOCK_HDR_SIZE_ bytes of the header. - @anchor a_redo_log_block_checkpoint_no checkpoint_no Checkpoint number of a next checkpoint write. Set by the log writer thread just before a write starts for the block. It could be used during recovery to detect that we have read old block of redo log (tail) because of the wrapped log files. - data part - bytes up to [data_len](@ref a_redo_log_block_data_len) byte. Actual data bytes are followed by _0x00_ if the block is incomplete. @note Bytes within this fragment of the block, are enumerated by _sn_ sequence (whereas bytes of header and trailer are NOT). This is the only difference between _sn_ and _lsn_ sequences (_lsn_ enumerates also bytes of header and trailer). - trailer - _LOG_BLOCK_TRL_SIZE_ bytes (4): - checksum Algorithm used for the checksum depends on the configuration. Note that there is a potential problem if a crash happened just after switching to "checksums enabled". During recovery some log blocks would have checksum = LOG_NO_CHECKSUM_MAGIC and some would have a valid checksum. Then recovery with enabled checksums would point problems for the blocks without valid checksum. User would have to disable checksums for the recovery then. @remarks All fields except [first_rec_group](@ref a_redo_log_block_first_rec_group) are updated by the [log writer thread](@ref sect_redo_log_writer) just before writing the block. *******************************************************/ /** Writes fragment of log buffer to log files. The first write to the first log block in a new log file, flushes header of the file. It stops after doing single fil_io operation. The reason is that it might make sense to advance lsn up to which we have ready data in log buffer for write, after time consuming operation, such as fil_io. The log.write_lsn is advanced. @param[in] log redo log @param[in] buffer the beginning of first log block to write @param[in] buffer_size number of bytes to write since 'buffer' @param[in] start_lsn lsn corresponding to first block start */ static void log_files_write_buffer(log_t &log, byte *buffer, size_t buffer_size, lsn_t start_lsn); /* Waits until there is free space in log files for log_writer to proceed. @param[in] log redo log @param[in] last_write_lsn previous log.write_lsn @param[in] next_write_lsn next log.write_lsn @return lsn up to which possible write is limited */ static lsn_t log_writer_wait_on_checkpoint(log_t &log, lsn_t last_write_lsn, lsn_t next_write_lsn); /* Waits until the archiver has archived enough for log_writer to proceed or until the archiver becomes aborted. @param[in] log redo log @param[in] last_write_lsn previous log.write_lsn @param[in] next_write_lsn next log.write_lsn */ static void log_writer_wait_on_archiver(log_t &log, lsn_t last_write_lsn, lsn_t next_write_lsn); /** Writes fragment of the log buffer up to provided lsn (not further). Stops after the first call to fil_io() (possibly at smaller lsn). Main side-effect: log.write_lsn is advanced. @param[in] log redo log @param[in] next_write_lsn write up to this lsn value */ static void log_writer_write_buffer(log_t &log, lsn_t next_write_lsn); /** Executes a synchronous flush of the log files (doing fsyncs). Advances log.flushed_to_disk_lsn and notifies log flush_notifier thread. Note: if only a single log block was flushed to disk, user threads waiting for lsns within the block are notified directly from here, and log flush_notifier thread is not notified! (optimization) @param[in,out] log redo log */ static void log_flush_low(log_t &log); /**************************************************/ /** @name Waiting for redo log written or flushed up to lsn *******************************************************/ /* @{ */ /** Computes maximum number of spin rounds which should be used when waiting in user thread (for written or flushed redo) or 0 if busy waiting should not be used at all. @param[in] min_non_zero_value minimum allowed value (unless 0 is returned) @return maximum number of spin rounds or 0 */ static inline uint64_t log_max_spins_when_waiting_in_user_thread( uint64_t min_non_zero_value) { uint64_t max_spins; /* Get current cpu usage. */ const double cpu = srv_cpu_usage.utime_pct; /* Get high-watermark - when cpu usage is higher, don't spin! */ const uint32_t hwm = srv_log_spin_cpu_pct_hwm; if (srv_cpu_usage.utime_abs < srv_log_spin_cpu_abs_lwm || cpu >= hwm) { /* Don't spin because either cpu usage is too high or it's almost idle so no reason to bother. */ max_spins = 0; } else if (cpu >= hwm / 2) { /* When cpu usage is more than 50% of the hwm, use the minimum allowed number of spin rounds, not to increase cpu usage too much (risky). */ max_spins = min_non_zero_value; } else { /* When cpu usage is less than 50% of the hwm, choose maximum spin rounds in range [minimum, 10*minimum]. Smaller usage of cpu is, more spin rounds might be used. */ const double r = 1.0 * (hwm / 2 - cpu) / (hwm / 2); max_spins = static_cast(min_non_zero_value + r * min_non_zero_value * 9); } return (max_spins); } /** Waits until redo log is written up to provided lsn (or greater). We do not care if it's flushed or not. @param[in] log redo log @param[in] lsn wait until log.write_lsn >= lsn @return statistics related to waiting inside */ static Wait_stats log_wait_for_write(const log_t &log, lsn_t lsn) { os_event_set(log.writer_event); const uint64_t max_spins = log_max_spins_when_waiting_in_user_thread( srv_log_wait_for_write_spin_delay); auto stop_condition = [&log, lsn](bool wait) { if (log.write_lsn.load() >= lsn) { return (true); } if (wait) { os_event_set(log.writer_event); } ut_d(log_background_write_threads_active_validate(log)); return (false); }; size_t slot = (lsn - 1) / OS_FILE_LOG_BLOCK_SIZE & (log.write_events_size - 1); const auto wait_stats = os_event_wait_for(log.write_events[slot], max_spins, srv_log_wait_for_write_timeout, stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_ON_WRITE_, wait_stats); return (wait_stats); } /** Waits until redo log is flushed up to provided lsn (or greater). @param[in] log redo log @param[in] lsn wait until log.flushed_to_disk_lsn >= lsn @return statistics related to waiting inside */ static Wait_stats log_wait_for_flush(const log_t &log, lsn_t lsn) { if (log.write_lsn.load(std::memory_order_relaxed) < lsn) { os_event_set(log.writer_event); } os_event_set(log.flusher_event); uint64_t max_spins = log_max_spins_when_waiting_in_user_thread( srv_log_wait_for_flush_spin_delay); if (log.flush_avg_time >= srv_log_wait_for_flush_spin_hwm) { max_spins = 0; } auto stop_condition = [&log, lsn](bool wait) { LOG_SYNC_POINT("log_wait_for_flush_before_flushed_to_disk_lsn"); if (log.flushed_to_disk_lsn.load() >= lsn) { return (true); } if (wait) { if (log.write_lsn.load(std::memory_order_relaxed) < lsn) { os_event_set(log.writer_event); } os_event_set(log.flusher_event); } LOG_SYNC_POINT("log_wait_for_flush_before_wait"); return (false); }; size_t slot = (lsn - 1) / OS_FILE_LOG_BLOCK_SIZE & (log.flush_events_size - 1); const auto wait_stats = os_event_wait_for(log.flush_events[slot], max_spins, srv_log_wait_for_flush_timeout, stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_ON_FLUSH_, wait_stats); return (wait_stats); } Wait_stats log_write_up_to(log_t &log, lsn_t end_lsn, bool flush_to_disk) { ut_a(!srv_read_only_mode); /* If we were updating log.flushed_to_disk_lsn while parsing redo log during recovery, we would have valid value here and we would not need to explicitly exit because of the recovery. However we do not update the log.flushed_to_disk during recovery (it is zero). On the other hand, when we apply log records during recovery, we modify pages and update their oldest/newest_modification. The modified pages become dirty. When size of the buffer pool is too small, some pages have to be flushed from LRU, to reclaim a free page for a next read. When flushing such dirty pages, we notice that newest_modification != 0, so the redo log has to be flushed up to the newest_modification, before flushing the page. In such case we end up here during recovery. Note that redo log is actually flushed, because changes to the page are caused by applying the redo. */ if (recv_no_ibuf_operations) { /* Recovery is running and no operations on the log files are allowed yet, which is implicitly deduced from the fact, that still ibuf merges are disallowed. */ return (Wait_stats{0}); } /* We do not need to have exact numbers and we do not care if we lost some increments for heavy workload. The value only has usage when it is low workload and we need to discover that we request redo write or flush only from time to time. In such case we prefer to avoid spinning in log threads to save on CPU power usage. */ log.write_to_file_requests_total.store( log.write_to_file_requests_total.load(std::memory_order_relaxed) + 1, std::memory_order_relaxed); ut_a(end_lsn != LSN_MAX); ut_a(end_lsn % OS_FILE_LOG_BLOCK_SIZE == 0 || end_lsn % OS_FILE_LOG_BLOCK_SIZE >= LOG_BLOCK_HDR_SIZE); ut_a(end_lsn % OS_FILE_LOG_BLOCK_SIZE <= OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE); ut_ad(end_lsn <= log_get_lsn(log)); if (flush_to_disk) { if (log.flushed_to_disk_lsn.load() >= end_lsn) { return (Wait_stats{0}); } Wait_stats wait_stats{0}; if (srv_flush_log_at_trx_commit != 1) { /* We need redo flushed, but because trx != 1, we have disabled notifications sent from log_writer to log_flusher. The log_flusher might be sleeping for 1 second, and we need quick response here. Log_writer avoids waking up log_flusher, so we must do it ourselves here. However, before we wake up log_flusher, we must ensure that log.write_lsn >= lsn. Otherwise log_flusher could flush some data which was ready for lsn values smaller than end_lsn and return to sleeping for next 1 second. */ if (log.write_lsn.load() < end_lsn) { wait_stats = log_wait_for_write(log, end_lsn); } } /* Wait until log gets flushed up to end_lsn. */ return (wait_stats + log_wait_for_flush(log, end_lsn)); } else { if (log.write_lsn.load() >= end_lsn) { return (Wait_stats{0}); } /* Wait until log gets written up to end_lsn. */ return (log_wait_for_write(log, end_lsn)); } } /* @} */ /**************************************************/ /** @name Log threads waiting strategy *******************************************************/ /* @{ */ /** Small utility which is used inside log threads when they have to wait for next interesting event to happen. For performance reasons, it might make sense to use spin-delay in front of the wait on event in such cases. The strategy is first to spin and then to fallback to the wait on event. However, for idle servers or work-loads which do not need redo being flushed as often, we prefer to avoid spinning. This utility solves such problems and provides waiting mechanism. */ struct Log_thread_waiting { Log_thread_waiting(const log_t &log, os_event_t event, uint64_t spin_delay, uint64_t min_timeout) : m_log(log), m_event{event}, m_spin_delay{static_cast(std::min( uint64_t(std::numeric_limits::max()), spin_delay))}, m_min_timeout{static_cast( /* No more than 1s */ std::min(uint64_t{1000 * 1000}, min_timeout))} {} template inline Wait_stats wait(Stop_condition stop_condition) { auto spin_delay = m_spin_delay; auto min_timeout = m_min_timeout; /** We might read older value, it just decides on spinning. Correctness does not depend on this. Only local performance might depend on this but it's anyway heuristic and depends on average which by definition has lag. No reason to make extra barriers here. */ const auto req_interval = m_log.write_to_file_requests_interval.load(std::memory_order_relaxed); if (srv_cpu_usage.utime_abs < srv_log_spin_cpu_abs_lwm || !log_write_to_file_requests_are_frequent(req_interval)) { /* Either: 1. CPU usage is very low on the server, which means the server is most likely idle or almost idle. 2. Request to write/flush redo to disk comes only once per 1ms in average or even less often. In both cases we prefer not to spend on CPU power, because there is no real gain from spinning in log threads then. */ spin_delay = 0; min_timeout = static_cast(req_interval < 1000 ? req_interval : 1000); } const auto wait_stats = os_event_wait_for(m_event, spin_delay, min_timeout, stop_condition); return (wait_stats); } private: const log_t &m_log; os_event_t m_event; const uint32_t m_spin_delay; const uint32_t m_min_timeout; }; struct Log_write_to_file_requests_monitor { explicit Log_write_to_file_requests_monitor(log_t &log) : m_log(log), m_last_requests_value{0}, m_request_interval{0} { m_last_requests_time = Log_clock::now(); } void update() { const auto requests_value = m_log.write_to_file_requests_total.load(std::memory_order_relaxed); const auto current_time = Log_clock::now(); if (current_time < m_last_requests_time) { m_last_requests_time = current_time; return; } const auto delta_time = current_time - m_last_requests_time; const auto delta_time_us = std::chrono::duration_cast(delta_time) .count(); if (requests_value > m_last_requests_value) { const auto delta_requests = requests_value - m_last_requests_value; const auto request_interval = delta_time_us / delta_requests; m_request_interval = (m_request_interval * 63 + request_interval) / 64; } else if (delta_time_us > 100 * 1000) { /* Last call to log_write_up_to() was longer than 100ms ago, so consider this as maximum time between calls we can expect. Tracking higher values does not make sense, because it is for sure already higher than any reasonable threshold which can be used to differ different activity modes. */ m_request_interval = 100 * 1000; /* 100ms */ } else { /* No progress in number of requests and still no more than 1second since last progress. Postpone any decision. */ return; } m_log.write_to_file_requests_interval.store(m_request_interval, std::memory_order_relaxed); MONITOR_SET(MONITOR_LOG_WRITE_TO_FILE_REQUESTS_INTERVAL, m_request_interval); m_last_requests_time = current_time; m_last_requests_value = requests_value; } private: log_t &m_log; uint64_t m_last_requests_value; Log_clock_point m_last_requests_time; uint64_t m_request_interval; }; /* @} */ /**************************************************/ /** @name Log writer thread *******************************************************/ /* @{ */ #else /* !UNIV_HOTBACKUP */ #define log_writer_mutex_own(log) true #endif /* !UNIV_HOTBACKUP */ uint64_t log_files_size_offset(const log_t &log, uint64_t offset) { ut_ad(log_writer_mutex_own(log)); return (offset - LOG_FILE_HDR_SIZE * (1 + offset / log.file_size)); } uint64_t log_files_real_offset(const log_t &log, uint64_t offset) { ut_ad(log_writer_mutex_own(log)); return (offset + LOG_FILE_HDR_SIZE * (1 + offset / (log.file_size - LOG_FILE_HDR_SIZE))); } uint64_t log_files_real_offset_for_lsn(const log_t &log, lsn_t lsn) { uint64_t size_offset; uint64_t size_capacity; uint64_t delta; ut_ad(log_writer_mutex_own(log)); size_capacity = log.n_files * (log.file_size - LOG_FILE_HDR_SIZE); if (lsn >= log.current_file_lsn) { delta = lsn - log.current_file_lsn; delta = delta % size_capacity; } else { /* Special case because lsn and offset are unsigned. */ delta = log.current_file_lsn - lsn; delta = size_capacity - delta % size_capacity; } size_offset = log_files_size_offset(log, log.current_file_real_offset); size_offset = (size_offset + delta) % size_capacity; return (log_files_real_offset(log, size_offset)); } #ifndef UNIV_HOTBACKUP void log_files_update_offsets(log_t &log, lsn_t lsn) { ut_ad(log_writer_mutex_own(log)); ut_a(log.file_size > 0); ut_a(log.n_files > 0); lsn = ut_uint64_align_down(lsn, OS_FILE_LOG_BLOCK_SIZE); log.current_file_real_offset = log_files_real_offset_for_lsn(log, lsn); /* Real offsets never enter headers of files when calculated for some LSN / size offset. */ ut_a(log.current_file_real_offset % log.file_size >= LOG_FILE_HDR_SIZE); log.current_file_lsn = lsn; log.current_file_end_offset = log.current_file_real_offset - log.current_file_real_offset % log.file_size + log.file_size; ut_a(log.current_file_end_offset % log.file_size == 0); } namespace Log_files_write_impl { static inline void validate_buffer(const log_t &log, const byte *buffer, size_t buffer_size) { ut_a(buffer >= log.buf); ut_a(buffer_size > 0); ut_a(buffer + buffer_size <= log.buf + log.buf_size); } static inline void validate_start_lsn(const log_t &log, lsn_t start_lsn, size_t buffer_size) { /* start_lsn corresponds to block, it must be aligned to 512 */ ut_a(start_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); /* Either full log block writes are possible or partial writes, which have to cover full header of log block then. */ ut_a((start_lsn + buffer_size) % OS_FILE_LOG_BLOCK_SIZE >= LOG_BLOCK_HDR_SIZE || (start_lsn + buffer_size) % OS_FILE_LOG_BLOCK_SIZE == 0); /* Partial writes do not touch footer of log block. */ ut_a((start_lsn + buffer_size) % OS_FILE_LOG_BLOCK_SIZE < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE); /* There are no holes. Note that possibly start_lsn is smaller, because it always points to the beginning of log block. */ ut_a(start_lsn <= log.write_lsn.load()); } static inline uint64_t compute_real_offset(const log_t &log, lsn_t start_lsn) { ut_a(start_lsn >= log.current_file_lsn); ut_a(log.current_file_real_offset % log.file_size >= LOG_FILE_HDR_SIZE); const auto real_offset = log.current_file_real_offset + (start_lsn - log.current_file_lsn); ut_a(real_offset % log.file_size >= LOG_FILE_HDR_SIZE || real_offset == log.current_file_end_offset); ut_a(real_offset % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(log_files_real_offset_for_lsn(log, start_lsn) == real_offset % log.files_real_capacity || real_offset == log.current_file_end_offset); return (real_offset); } static inline bool current_file_has_space(const log_t &log, uint64_t offset, size_t size) { return (offset + size <= log.current_file_end_offset); } static void start_next_file(log_t &log, lsn_t start_lsn) { const auto before_update = log.current_file_end_offset; auto real_offset = before_update; ut_a(log.file_size % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(real_offset / log.file_size <= ULINT_MAX); ut_a(real_offset <= log.files_real_capacity); if (real_offset == log.files_real_capacity) { /* Wrapped log files, start at file 0, just after its initial headers. */ real_offset = LOG_FILE_HDR_SIZE; } ut_a(real_offset + OS_FILE_LOG_BLOCK_SIZE <= log.files_real_capacity); /* Flush header of the new log file. */ uint32_t nth_file = static_cast(real_offset / log.file_size); log_files_header_flush(log, nth_file, start_lsn); /* Update following members of log: - current_file_lsn, - current_file_real_offset, - current_file_end_offset. The only reason is to optimize future calculations of offsets within the new log file. */ log_files_update_offsets(log, start_lsn); ut_a(log.current_file_real_offset == before_update + LOG_FILE_HDR_SIZE || (before_update == log.files_real_capacity && log.current_file_real_offset == LOG_FILE_HDR_SIZE)); ut_a(log.current_file_real_offset - LOG_FILE_HDR_SIZE == log.current_file_end_offset - log.file_size); log.write_ahead_end_offset = 0; } static inline bool write_ahead_enough(uint64_t write_ahead_end, uint64_t offset, size_t size) { return (write_ahead_end >= offset + size); } static inline bool current_write_ahead_enough(const log_t &log, uint64_t offset, size_t size) { return (write_ahead_enough(log.write_ahead_end_offset, offset, size)); } static inline uint64_t compute_next_write_ahead_end(uint64_t real_offset) { const auto last_wa = ut_uint64_align_down(real_offset, srv_log_write_ahead_size); const auto next_wa = last_wa + srv_log_write_ahead_size; ut_a(next_wa > real_offset); ut_a(next_wa % srv_log_write_ahead_size == 0); return (next_wa); } static inline size_t compute_how_much_to_write(const log_t &log, uint64_t real_offset, size_t buffer_size, bool &write_from_log_buffer) { size_t write_size; /* First we ensure, that we will write within single log file. If we had more to write and cannot fit the current log file, we first write what fits, then stops and returns to the main loop of the log writer thread. Then, the log writer will update maximum lsn up to which, it has data ready in the log buffer, and request next write operation according to its strategy. */ if (!current_file_has_space(log, real_offset, buffer_size)) { /* The end of write would not fit the current log file. */ /* But the beginning is guaranteed to fit or to be placed at the first byte of the next file. */ ut_a(current_file_has_space(log, real_offset, 0)); if (!current_file_has_space(log, real_offset, 1)) { /* The beginning of write is at the first byte of the next log file. Flush header of the next log file, advance current log file to the next, stop and return to the main loop of log writer. */ write_from_log_buffer = false; return (0); } else { /* We write across at least two consecutive log files. Limit current write to the first one and then retry for next_file. */ /* If the condition for real_offset + buffer_size holds, then the expression below is < buffer_size, which is size_t, so the typecast is ok. */ write_size = static_cast(log.current_file_end_offset - real_offset); ut_a(write_size <= buffer_size); ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE == 0); } } else { write_size = buffer_size; ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE >= LOG_BLOCK_HDR_SIZE || write_size % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE < OS_FILE_LOG_BLOCK_SIZE - LOG_BLOCK_TRL_SIZE); } /* Now, we know we can write write_size bytes from the buffer, and we will do the write within single log file - current one. */ ut_a(write_size > 0); ut_a(real_offset >= log.current_file_real_offset); ut_a(real_offset + write_size <= log.current_file_end_offset); ut_a(log.current_file_real_offset / log.file_size + 1 == log.current_file_end_offset / log.file_size); /* We are interested in writing from log buffer only, if we had at least one completed block for write. Still we might decide not to write from the log buffer, because write-ahead is needed. In such case we could write together with the last incomplete block after copying. */ write_from_log_buffer = write_size >= OS_FILE_LOG_BLOCK_SIZE; if (write_from_log_buffer) { MONITOR_INC(MONITOR_LOG_FULL_BLOCK_WRITES); } else { MONITOR_INC(MONITOR_LOG_PARTIAL_BLOCK_WRITES); } /* Check how much we have written ahead to avoid read-on-write. */ if (!current_write_ahead_enough(log, real_offset, write_size)) { if (!current_write_ahead_enough(log, real_offset, 1)) { /* Current write-ahead region has no space at all. */ const auto next_wa = compute_next_write_ahead_end(real_offset); if (!write_ahead_enough(next_wa, real_offset, write_size)) { /* ... and also the next write-ahead is too small. Therefore we have more data to write than size of the write-ahead. We write from the log buffer, skipping last fragment for which the write ahead is required. */ ut_a(write_from_log_buffer); write_size = next_wa - real_offset; ut_a((real_offset + write_size) % srv_log_write_ahead_size == 0); ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE == 0); } else { /* We copy data to write_ahead buffer, and write from there doing write-ahead of the bigger region in the same time. */ write_from_log_buffer = false; } } else { /* We limit write up to the end of region we have written ahead already. */ write_size = static_cast(log.write_ahead_end_offset - real_offset); ut_a(write_size >= OS_FILE_LOG_BLOCK_SIZE); ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE == 0); } } else { if (write_from_log_buffer) { write_size = ut_uint64_align_down(write_size, OS_FILE_LOG_BLOCK_SIZE); } } return (write_size); } static inline void prepare_full_blocks(const log_t &log, byte *buffer, size_t size, lsn_t start_lsn, checkpoint_no_t checkpoint_no) { /* Prepare all completed blocks which are going to be written. Note, that completed blocks are always prepared in the log buffer, even if they are later copied to write_ahead buffer. This guarantees that finally we should have all blocks prepared in the log buffer (incomplete blocks will be rewritten once they became completed). */ size_t buffer_offset; for (buffer_offset = 0; buffer_offset + OS_FILE_LOG_BLOCK_SIZE <= size; buffer_offset += OS_FILE_LOG_BLOCK_SIZE) { byte *ptr; ptr = buffer + buffer_offset; ut_a(ptr >= log.buf); ut_a(ptr + OS_FILE_LOG_BLOCK_SIZE <= log.buf + log.buf_size); log_block_set_hdr_no( ptr, log_block_convert_lsn_to_no(start_lsn + buffer_offset)); log_block_set_flush_bit(ptr, buffer_offset == 0); log_block_set_data_len(ptr, OS_FILE_LOG_BLOCK_SIZE); log_block_set_checkpoint_no(ptr, checkpoint_no); log_block_store_checksum(ptr); } } static inline void write_blocks(log_t &log, byte *write_buf, size_t write_size, uint64_t real_offset) { ut_a(write_size >= OS_FILE_LOG_BLOCK_SIZE); ut_a(write_size % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(real_offset / UNIV_PAGE_SIZE <= PAGE_NO_MAX); page_no_t page_no; page_no = static_cast(real_offset / univ_page_size.physical()); ut_a(log.write_ahead_end_offset % srv_log_write_ahead_size == 0); ut_a(real_offset + write_size <= log.write_ahead_end_offset || (real_offset + write_size) % srv_log_write_ahead_size == 0); auto err = fil_redo_io( IORequestLogWrite, page_id_t{log.files_space_id, page_no}, univ_page_size, static_cast(real_offset % UNIV_PAGE_SIZE), write_size, write_buf); meb::redo_log_archive_produce(write_buf, write_size); ut_a(err == DB_SUCCESS); } static inline size_t compute_write_event_slot(const log_t &log, lsn_t lsn) { return ((lsn / OS_FILE_LOG_BLOCK_SIZE) & (log.write_events_size - 1)); } static inline void notify_about_advanced_write_lsn(log_t &log, lsn_t old_write_lsn, lsn_t new_write_lsn) { if (srv_flush_log_at_trx_commit == 1) { os_event_set(log.flusher_event); } const auto first_slot = compute_write_event_slot(log, old_write_lsn); const auto last_slot = compute_write_event_slot(log, new_write_lsn); if (first_slot == last_slot) { LOG_SYNC_POINT("log_write_before_users_notify"); os_event_set(log.write_events[first_slot]); } else { LOG_SYNC_POINT("log_write_before_notifier_notify"); os_event_set(log.write_notifier_event); } if (arch_log_sys && arch_log_sys->is_active()) { os_event_set(log_archiver_thread_event); } } static inline void copy_to_write_ahead_buffer(log_t &log, const byte *buffer, size_t &size, lsn_t start_lsn, checkpoint_no_t checkpoint_no) { ut_a(size <= srv_log_write_ahead_size); ut_a(buffer >= log.buf); ut_a(buffer + size <= log.buf + log.buf_size); byte *write_buf = log.write_ahead_buf; LOG_SYNC_POINT("log_writer_before_copy_to_write_ahead_buffer"); std::memcpy(write_buf, buffer, size); size_t completed_blocks_size; byte *incomplete_block; size_t incomplete_size; completed_blocks_size = ut_uint64_align_down(size, OS_FILE_LOG_BLOCK_SIZE); incomplete_block = write_buf + completed_blocks_size; incomplete_size = size % OS_FILE_LOG_BLOCK_SIZE; ut_a(incomplete_block + incomplete_size <= write_buf + srv_log_write_ahead_size); if (incomplete_size != 0) { /* Prepare the incomplete (last) block. */ ut_a(incomplete_size >= LOG_BLOCK_HDR_SIZE); log_block_set_hdr_no( incomplete_block, log_block_convert_lsn_to_no(start_lsn + completed_blocks_size)); log_block_set_flush_bit(incomplete_block, completed_blocks_size == 0); log_block_set_data_len(incomplete_block, incomplete_size); if (log_block_get_first_rec_group(incomplete_block) > incomplete_size) { log_block_set_first_rec_group(incomplete_block, 0); } log_block_set_checkpoint_no(incomplete_block, checkpoint_no); std::memset(incomplete_block + incomplete_size, 0x00, OS_FILE_LOG_BLOCK_SIZE - incomplete_size); log_block_store_checksum(incomplete_block); size = completed_blocks_size + OS_FILE_LOG_BLOCK_SIZE; } /* Since now, size is about completed blocks always. */ ut_a(size % OS_FILE_LOG_BLOCK_SIZE == 0); } static inline size_t prepare_for_write_ahead(log_t &log, uint64_t real_offset, size_t &write_size) { /* We need to perform write ahead during this write. */ const auto next_wa = compute_next_write_ahead_end(real_offset); ut_a(real_offset + write_size <= next_wa); size_t write_ahead = static_cast(next_wa - (real_offset + write_size)); if (!current_file_has_space(log, real_offset, write_size + write_ahead)) { /* We must not write further than to the end of the current log file. Note, that: log.file_size - LOG_FILE_HDR_SIZE does not have to be divisible by size of write ahead. Example given: innodb_log_file_size = 1024M, innodb_log_write_ahead_size = 4KiB, LOG_FILE_HDR_SIZE is 2KiB. */ write_ahead = static_cast(log.current_file_end_offset - real_offset - write_size); } ut_a(current_file_has_space(log, real_offset, write_size + write_ahead)); LOG_SYNC_POINT("log_writer_before_write_ahead"); std::memset(log.write_ahead_buf + write_size, 0x00, write_ahead); write_size += write_ahead; return (write_ahead); } static inline void update_current_write_ahead(log_t &log, uint64_t real_offset, size_t write_size) { const auto end = real_offset + write_size; if (end > log.write_ahead_end_offset) { log.write_ahead_end_offset = ut_uint64_align_down(end, srv_log_write_ahead_size); } } } // namespace Log_files_write_impl static void log_files_write_buffer(log_t &log, byte *buffer, size_t buffer_size, lsn_t start_lsn) { ut_ad(log_writer_mutex_own(log)); using namespace Log_files_write_impl; validate_buffer(log, buffer, buffer_size); validate_start_lsn(log, start_lsn, buffer_size); checkpoint_no_t checkpoint_no = log.next_checkpoint_no.load(); const auto real_offset = compute_real_offset(log, start_lsn); bool write_from_log_buffer; auto write_size = compute_how_much_to_write(log, real_offset, buffer_size, write_from_log_buffer); if (write_size == 0) { start_next_file(log, start_lsn); return; } prepare_full_blocks(log, buffer, write_size, start_lsn, checkpoint_no); byte *write_buf; uint64_t written_ahead = 0; lsn_t lsn_advance = write_size; if (write_from_log_buffer) { /* We have at least one completed log block to write. We write completed blocks from the log buffer. Note, that possibly we do not write all completed blocks, because of write-ahead strategy (described earlier). */ DBUG_PRINT("ib_log", ("write from log buffer start_lsn=" LSN_PF " write_lsn=" LSN_PF " -> " LSN_PF, start_lsn, log.write_lsn.load(), start_lsn + lsn_advance)); write_buf = buffer; LOG_SYNC_POINT("log_writer_before_write_from_log_buffer"); } else { DBUG_PRINT("ib_log", ("incomplete write start_lsn=" LSN_PF " write_lsn=" LSN_PF " -> " LSN_PF, start_lsn, log.write_lsn.load(), start_lsn + lsn_advance)); #ifdef UNIV_DEBUG if (start_lsn == log.write_lsn.load()) { LOG_SYNC_POINT("log_writer_before_write_new_incomplete_block"); } /* Else: we are doing yet another incomplete block write within the same block as the one in which we did the previous write. */ #endif /* UNIV_DEBUG */ write_buf = log.write_ahead_buf; /* We write all the data directly from the write-ahead buffer, where we first need to copy the data. */ copy_to_write_ahead_buffer(log, buffer, write_size, start_lsn, checkpoint_no); if (!current_write_ahead_enough(log, real_offset, 1)) { written_ahead = prepare_for_write_ahead(log, real_offset, write_size); } } srv_stats.os_log_pending_writes.inc(); /* Now, we know, that we are going to write completed blocks only (originally or copied and completed). */ write_blocks(log, write_buf, write_size, real_offset); LOG_SYNC_POINT("log_writer_before_lsn_update"); const lsn_t old_write_lsn = log.write_lsn.load(); const lsn_t new_write_lsn = start_lsn + lsn_advance; ut_a(new_write_lsn > log.write_lsn.load()); log.write_lsn.store(new_write_lsn); notify_about_advanced_write_lsn(log, old_write_lsn, new_write_lsn); LOG_SYNC_POINT("log_writer_before_buf_limit_update"); log_update_buf_limit(log, new_write_lsn); srv_stats.os_log_pending_writes.dec(); srv_stats.log_writes.inc(); /* Write ahead is included in write_size. */ ut_a(write_size >= written_ahead); srv_stats.os_log_written.add(write_size - written_ahead); MONITOR_INC_VALUE(MONITOR_LOG_PADDED, written_ahead); int64_t free_space = log.lsn_capacity_for_writer - log.extra_margin; /* The free space may be negative (up to -log.extra_margin), in which case we are in the emergency mode, eating the extra margin and asking to increase concurrency_margin. */ free_space -= new_write_lsn - log.last_checkpoint_lsn.load(); MONITOR_SET(MONITOR_LOG_FREE_SPACE, free_space); log.n_log_ios++; update_current_write_ahead(log, real_offset, write_size); } static lsn_t log_writer_wait_on_checkpoint(log_t &log, lsn_t last_write_lsn, lsn_t next_write_lsn) { const int32_t SLEEP_BETWEEN_RETRIES_IN_US = 100; /* 100us */ const int32_t TIME_UNTIL_ERROR_IN_US = 5000000; /* 5s */ ut_ad(log_writer_mutex_own(log)); int32_t count = 1; lsn_t checkpoint_limited_lsn = LSN_MAX; while (true) { lsn_t checkpoint_lsn = log.last_checkpoint_lsn.load(); checkpoint_lsn = ut_uint64_align_down(checkpoint_lsn, OS_FILE_LOG_BLOCK_SIZE); checkpoint_limited_lsn = checkpoint_lsn + log.lsn_capacity_for_writer; ut_a(last_write_lsn <= checkpoint_limited_lsn); ut_a(next_write_lsn > checkpoint_lsn); if (next_write_lsn + log.extra_margin <= checkpoint_limited_lsn) { log.concurrency_margin_ok = true; break; } if (log.concurrency_margin_ok) { log.concurrency_margin_ok = false; log_increase_concurrency_margin(log); } os_event_set(log.checkpointer_event); if (last_write_lsn + OS_FILE_LOG_BLOCK_SIZE <= checkpoint_limited_lsn) { /* Write what we have - adjust the speed to speed of checkpoints going forward (to speed of page-cleaners). */ break; } (void)log_advance_ready_for_write_lsn(log); const int32_t ATTEMPTS_UNTIL_ERROR = TIME_UNTIL_ERROR_IN_US / SLEEP_BETWEEN_RETRIES_IN_US; if (count % ATTEMPTS_UNTIL_ERROR == 0) { /* We could not reclaim even single redo block for 5sec */ ib::error(ER_IB_MSG_1234) << "Out of space in the redo log." " Checkpoint LSN: " << checkpoint_lsn << "."; } log_writer_mutex_exit(log); /* We don't want to ask for sync checkpoint, because it is possible, that the oldest dirty page is latched and user thread, which keeps the latch, is waiting for space in log buffer (for log_writer writing to disk). In such case it would be deadlock (we can't flush the latched page and advance the checkpoint). We only ask for the checkpoint, and wait for some time. */ log_request_checkpoint(log, false); count++; os_thread_sleep(SLEEP_BETWEEN_RETRIES_IN_US); MONITOR_INC(MONITOR_LOG_WRITER_ON_FREE_SPACE_WAITS); log_writer_mutex_enter(log); } return checkpoint_limited_lsn; } static void log_writer_wait_on_archiver(log_t &log, lsn_t last_write_lsn, lsn_t next_write_lsn) { const int32_t SLEEP_BETWEEN_RETRIES_IN_US = 100; /* 100us */ const int32_t TIME_BETWEEN_WARNINGS_IN_US = 100000; /* 100ms */ const int32_t TIME_UNTIL_ERROR_IN_US = 1000000; /* 1s */ ut_ad(log_writer_mutex_own(log)); int32_t count = 0; while (arch_log_sys != nullptr && arch_log_sys->is_active()) { lsn_t archiver_lsn = arch_log_sys->get_archived_lsn(); archiver_lsn = ut_uint64_align_down(archiver_lsn, OS_FILE_LOG_BLOCK_SIZE); lsn_t archiver_limited_lsn = archiver_lsn + log.lsn_capacity_for_writer; ut_a(next_write_lsn > archiver_lsn); if (next_write_lsn <= archiver_limited_lsn) { /* Between archive_lsn and next_write_lsn there is less bytes than capacity of all log files. Writing log up to next_write_lsn will not overwrite data at archiver_lsn. There is no need to wait for the archiver. */ break; } (void)log_advance_ready_for_write_lsn(log); const int32_t ATTEMPTS_UNTIL_ERROR = TIME_UNTIL_ERROR_IN_US / SLEEP_BETWEEN_RETRIES_IN_US; if (count >= ATTEMPTS_UNTIL_ERROR) { log_writer_mutex_exit(log); arch_log_sys->force_abort(); const lsn_t lag = next_write_lsn - archiver_limited_lsn; ib::error(ER_IB_MSG_1236) << "Log writer waited too long for redo-archiver" " to advance (1 second). There are unarchived: " << lag << " bytes. Archiver LSN: " << archiver_lsn << ". Aborted the redo-archiver."; log_writer_mutex_enter(log); break; } os_event_set(log_archiver_thread_event); log_writer_mutex_exit(log); const int32_t ATTEMPTS_BETWEEN_WARNINGS = TIME_BETWEEN_WARNINGS_IN_US / SLEEP_BETWEEN_RETRIES_IN_US; if (count % ATTEMPTS_BETWEEN_WARNINGS == 0) { const lsn_t lag = next_write_lsn - archiver_limited_lsn; ib::warn(ER_IB_MSG_1237) << "Log writer is waiting for redo-archiver" " to catch up unarchived: " << lag << " bytes. Archiver LSN: " << archiver_lsn << "."; } count++; os_thread_sleep(SLEEP_BETWEEN_RETRIES_IN_US); MONITOR_INC(MONITOR_LOG_WRITER_ON_ARCHIVER_WAITS); log_writer_mutex_enter(log); } } static void log_writer_write_buffer(log_t &log, lsn_t next_write_lsn) { ut_ad(log_writer_mutex_own(log)); LOG_SYNC_POINT("log_writer_write_begin"); const lsn_t last_write_lsn = log.write_lsn.load(); ut_a(log_lsn_validate(last_write_lsn) || last_write_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(log_lsn_validate(next_write_lsn) || next_write_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); ut_a(next_write_lsn - last_write_lsn <= log.buf_size); ut_a(next_write_lsn > last_write_lsn); size_t start_offset = last_write_lsn % log.buf_size; size_t end_offset = next_write_lsn % log.buf_size; if (start_offset >= end_offset) { ut_a(next_write_lsn - last_write_lsn >= log.buf_size - start_offset); end_offset = log.buf_size; next_write_lsn = last_write_lsn + (end_offset - start_offset); } ut_a(start_offset < end_offset); ut_a(end_offset % OS_FILE_LOG_BLOCK_SIZE == 0 || end_offset % OS_FILE_LOG_BLOCK_SIZE >= LOG_BLOCK_HDR_SIZE); /* Wait until there is free space in log files.*/ const lsn_t checkpoint_limited_lsn = log_writer_wait_on_checkpoint(log, last_write_lsn, next_write_lsn); ut_ad(log_writer_mutex_own(log)); ut_a(checkpoint_limited_lsn > last_write_lsn); LOG_SYNC_POINT("log_writer_after_checkpoint_check"); if (arch_log_sys != nullptr) { log_writer_wait_on_archiver(log, last_write_lsn, next_write_lsn); } ut_ad(log_writer_mutex_own(log)); LOG_SYNC_POINT("log_writer_after_archiver_check"); const lsn_t limit_for_next_write_lsn = checkpoint_limited_lsn; if (limit_for_next_write_lsn < next_write_lsn) { end_offset -= next_write_lsn - limit_for_next_write_lsn; next_write_lsn = limit_for_next_write_lsn; ut_a(end_offset > start_offset); ut_a(end_offset % OS_FILE_LOG_BLOCK_SIZE == 0 || end_offset % OS_FILE_LOG_BLOCK_SIZE >= LOG_BLOCK_HDR_SIZE); ut_a(log_lsn_validate(next_write_lsn) || next_write_lsn % OS_FILE_LOG_BLOCK_SIZE == 0); } DBUG_PRINT("ib_log", ("write " LSN_PF " to " LSN_PF, last_write_lsn, next_write_lsn)); byte *buf_begin = log.buf + ut_uint64_align_down(start_offset, OS_FILE_LOG_BLOCK_SIZE); byte *buf_end = log.buf + end_offset; /* Do the write to the log files */ log_files_write_buffer( log, buf_begin, buf_end - buf_begin, ut_uint64_align_down(last_write_lsn, OS_FILE_LOG_BLOCK_SIZE)); LOG_SYNC_POINT("log_writer_write_end"); } void log_writer(log_t *log_ptr) { ut_a(log_ptr != nullptr); log_t &log = *log_ptr; lsn_t ready_lsn = 0; log_writer_mutex_enter(log); Log_thread_waiting waiting{log, log.writer_event, srv_log_writer_spin_delay, srv_log_writer_timeout}; Log_write_to_file_requests_monitor write_to_file_requests_monitor{log}; for (uint64_t step = 0;; ++step) { bool released = false; auto stop_condition = [&ready_lsn, &log, &released, &write_to_file_requests_monitor](bool wait) { if (released) { log_writer_mutex_enter(log); released = false; } /* Advance lsn up to which data is ready in log buffer. */ (void)log_advance_ready_for_write_lsn(log); ready_lsn = log_buffer_ready_for_write_lsn(log); /* Wait until any of following conditions holds: 1) There is some unwritten data in log buffer 2) We should close threads. */ if (log.write_lsn.load() < ready_lsn || log.should_stop_threads.load()) { return (true); } if (wait) { write_to_file_requests_monitor.update(); log_writer_mutex_exit(log); released = true; } return (false); }; const auto wait_stats = waiting.wait(stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_WRITER_, wait_stats); /* Do the actual work. */ if (log.write_lsn.load() < ready_lsn) { log_writer_write_buffer(log, ready_lsn); if (step % 1024 == 0) { write_to_file_requests_monitor.update(); log_writer_mutex_exit(log); os_thread_sleep(0); log_writer_mutex_enter(log); } } else { if (log.should_stop_threads.load()) { /* When log threads are stopped, we must first ensure that all writes to log buffer have been finished and only then we are allowed to set the should_stop_threads to true. */ if (!log_advance_ready_for_write_lsn(log)) { break; } ready_lsn = log_buffer_ready_for_write_lsn(log); } } } log_writer_mutex_exit(log); } /* @} */ /**************************************************/ /** @name Log flusher thread *******************************************************/ /* @{ */ static void log_flush_update_stats(log_t &log) { ut_ad(log_flusher_mutex_own(log)); /* Note that this code is inspired by similar logic in buf0flu.cc */ static uint64_t iterations = 0; static Log_clock_point prev_time{}; static lsn_t prev_lsn; static lsn_t lsn_avg_rate = 0; static Log_clock::duration fsync_max_time; static Log_clock::duration fsync_total_time; /* Calculate time of last fsync and update related counters. */ Log_clock::duration fsync_time; fsync_time = log.last_flush_end_time - log.last_flush_start_time; ut_a(fsync_time.count() >= 0); fsync_max_time = std::max(fsync_max_time, fsync_time); fsync_total_time += fsync_time; MONITOR_INC_VALUE( MONITOR_LOG_FLUSH_TOTAL_TIME, std::chrono::duration_cast(fsync_time) .count()); /* Calculate time elapsed since start of last sample. */ if (prev_time == Log_clock_point{}) { prev_time = log.last_flush_start_time; prev_lsn = log.flushed_to_disk_lsn.load(); } const Log_clock_point curr_time = log.last_flush_end_time; if (curr_time < prev_time) { /* Time was moved backward since we set prev_time. We cannot determine how much time passed since then. */ prev_time = curr_time; } auto time_elapsed = std::chrono::duration_cast(curr_time - prev_time) .count(); ut_a(time_elapsed >= 0); if (++iterations >= srv_flushing_avg_loops || time_elapsed >= static_cast(srv_flushing_avg_loops)) { if (time_elapsed < 1) { time_elapsed = 1; } const lsn_t curr_lsn = log.flushed_to_disk_lsn.load(); const lsn_t lsn_rate = static_cast( static_cast(curr_lsn - prev_lsn) / time_elapsed); lsn_avg_rate = (lsn_avg_rate + lsn_rate) / 2; MONITOR_SET(MONITOR_LOG_FLUSH_LSN_AVG_RATE, lsn_avg_rate); MONITOR_SET( MONITOR_LOG_FLUSH_MAX_TIME, std::chrono::duration_cast(fsync_max_time) .count()); log.flush_avg_time = std::chrono::duration_cast(fsync_total_time) .count() * 1.0 / iterations; MONITOR_SET(MONITOR_LOG_FLUSH_AVG_TIME, log.flush_avg_time); fsync_max_time = Log_clock::duration{}; fsync_total_time = Log_clock::duration{}; iterations = 0; prev_time = curr_time; prev_lsn = curr_lsn; } } static void log_flush_low(log_t &log) { ut_ad(log_flusher_mutex_own(log)); #ifndef _WIN32 bool do_flush = srv_unix_file_flush_method != SRV_UNIX_O_DSYNC; #else bool do_flush = true; #endif os_event_reset(log.flusher_event); log.last_flush_start_time = Log_clock::now(); const lsn_t last_flush_lsn = log.flushed_to_disk_lsn.load(); const lsn_t flush_up_to_lsn = log.write_lsn.load(); ut_a(flush_up_to_lsn > last_flush_lsn); if (do_flush) { LOG_SYNC_POINT("log_flush_before_fsync"); fil_flush_file_redo(); } log.last_flush_end_time = Log_clock::now(); if (log.last_flush_end_time < log.last_flush_start_time) { /* Time was moved backward after we set start_time. Let assume that the fsync operation was instant. We move start_time backward, because we don't want it to remain in the future. */ log.last_flush_start_time = log.last_flush_end_time; } LOG_SYNC_POINT("log_flush_before_flushed_to_disk_lsn"); log.flushed_to_disk_lsn.store(flush_up_to_lsn); /* Notify other thread(s). */ DBUG_PRINT("ib_log", ("Flushed to disk up to " LSN_PF, flush_up_to_lsn)); const auto first_slot = last_flush_lsn / OS_FILE_LOG_BLOCK_SIZE & (log.flush_events_size - 1); const auto last_slot = (flush_up_to_lsn - 1) / OS_FILE_LOG_BLOCK_SIZE & (log.flush_events_size - 1); if (first_slot == last_slot) { LOG_SYNC_POINT("log_flush_before_users_notify"); os_event_set(log.flush_events[first_slot]); } else { LOG_SYNC_POINT("log_flush_before_notifier_notify"); os_event_set(log.flush_notifier_event); } /* Update stats. */ log_flush_update_stats(log); } void log_flusher(log_t *log_ptr) { ut_a(log_ptr != nullptr); log_t &log = *log_ptr; Log_thread_waiting waiting{log, log.flusher_event, srv_log_flusher_spin_delay, srv_log_flusher_timeout}; log_flusher_mutex_enter(log); for (uint64_t step = 0;; ++step) { if (log.should_stop_threads.load()) { if (!log_writer_is_active()) { /* If write_lsn > flushed_to_disk_lsn, we are going to execute one more fsync just after the for-loop and before this thread exits (inside log_flush_low at the very end of function def.). */ break; } } bool released = false; auto stop_condition = [&log, &released, step](bool wait) { if (released) { log_flusher_mutex_enter(log); released = false; } LOG_SYNC_POINT("log_flusher_before_should_flush"); const lsn_t last_flush_lsn = log.flushed_to_disk_lsn.load(); ut_a(last_flush_lsn <= log.write_lsn.load()); if (last_flush_lsn < log.write_lsn.load()) { /* Flush and stop waiting. */ log_flush_low(log); if (step % 1024 == 0) { log_flusher_mutex_exit(log); os_thread_sleep(0); log_flusher_mutex_enter(log); } return (true); } /* Stop waiting if writer thread is dead. */ if (log.should_stop_threads.load()) { if (!log_writer_is_active()) { return (true); } } if (wait) { log_flusher_mutex_exit(log); released = true; } return (false); }; if (srv_flush_log_at_trx_commit != 1) { const auto current_time = Log_clock::now(); ut_ad(log.last_flush_end_time >= log.last_flush_start_time); if (current_time < log.last_flush_end_time) { /* Time was moved backward, possibly by a lot, so we need to adjust the last_flush times, because otherwise we could stop flushing every innodb_flush_log_at_timeout for a while. */ log.last_flush_start_time = current_time; log.last_flush_end_time = current_time; } const auto time_elapsed = current_time - log.last_flush_start_time; using us = std::chrono::microseconds; const auto time_elapsed_us = std::chrono::duration_cast(time_elapsed).count(); ut_a(time_elapsed_us >= 0); const auto flush_every = srv_flush_log_at_timeout; const auto flush_every_us = 1000000LL * flush_every; if (time_elapsed_us < flush_every_us) { log_flusher_mutex_exit(log); /* When we are asked to stop threads, do not respect the limit for flushes per second. */ if (!log.should_stop_threads.load()) { os_event_wait_time_low(log.flusher_event, flush_every_us - time_elapsed_us, 0); } log_flusher_mutex_enter(log); } } const auto wait_stats = waiting.wait(stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_FLUSHER_, wait_stats); } if (log.write_lsn.load() > log.flushed_to_disk_lsn.load()) { log_flush_low(log); } ut_a(log.write_lsn.load() == log.flushed_to_disk_lsn.load()); log_flusher_mutex_exit(log); } /* @} */ /**************************************************/ /** @name Log write_notifier thread *******************************************************/ /* @{ */ void log_write_notifier(log_t *log_ptr) { ut_a(log_ptr != nullptr); log_t &log = *log_ptr; lsn_t lsn = log.write_lsn.load() + 1; log_write_notifier_mutex_enter(log); Log_thread_waiting waiting{log, log.write_notifier_event, srv_log_write_notifier_spin_delay, srv_log_write_notifier_timeout}; for (uint64_t step = 0;; ++step) { if (log.should_stop_threads.load()) { if (!log_writer_is_active()) { if (lsn > log.write_lsn.load()) { ut_a(lsn == log.write_lsn.load() + 1); break; } } } LOG_SYNC_POINT("log_write_notifier_before_check"); bool released = false; auto stop_condition = [&log, lsn, &released](bool wait) { LOG_SYNC_POINT("log_write_notifier_after_event_reset"); if (released) { log_write_notifier_mutex_enter(log); released = false; } LOG_SYNC_POINT("log_write_notifier_before_check"); if (log.write_lsn.load() >= lsn) { return (true); } if (log.should_stop_threads.load()) { if (!log_writer_is_active()) { return (true); } } if (wait) { log_write_notifier_mutex_exit(log); released = true; } LOG_SYNC_POINT("log_write_notifier_before_wait"); return (false); }; const auto wait_stats = waiting.wait(stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_WRITE_NOTIFIER_, wait_stats); LOG_SYNC_POINT("log_write_notifier_before_write_lsn"); const lsn_t write_lsn = log.write_lsn.load(); const lsn_t notified_up_to_lsn = ut_uint64_align_up(write_lsn, OS_FILE_LOG_BLOCK_SIZE); while (lsn <= notified_up_to_lsn) { const auto slot = (lsn - 1) / OS_FILE_LOG_BLOCK_SIZE & (log.write_events_size - 1); lsn += OS_FILE_LOG_BLOCK_SIZE; LOG_SYNC_POINT("log_write_notifier_before_notify"); os_event_set(log.write_events[slot]); } lsn = write_lsn + 1; if (step % 1024 == 0) { log_write_notifier_mutex_exit(log); os_thread_sleep(0); log_write_notifier_mutex_enter(log); } } log_write_notifier_mutex_exit(log); } /* @} */ /**************************************************/ /** @name Log flush_notifier thread *******************************************************/ /* @{ */ void log_flush_notifier(log_t *log_ptr) { ut_a(log_ptr != nullptr); log_t &log = *log_ptr; lsn_t lsn = log.flushed_to_disk_lsn.load() + 1; log_flush_notifier_mutex_enter(log); Log_thread_waiting waiting{log, log.flush_notifier_event, srv_log_flush_notifier_spin_delay, srv_log_flush_notifier_timeout}; for (uint64_t step = 0;; ++step) { if (log.should_stop_threads.load()) { if (!log_flusher_is_active()) { if (lsn > log.flushed_to_disk_lsn.load()) { ut_a(lsn == log.flushed_to_disk_lsn.load() + 1); break; } } } LOG_SYNC_POINT("log_flush_notifier_before_check"); bool released = false; auto stop_condition = [&log, lsn, &released](bool wait) { LOG_SYNC_POINT("log_flush_notifier_after_event_reset"); if (released) { log_flush_notifier_mutex_enter(log); released = false; } LOG_SYNC_POINT("log_flush_notifier_before_check"); if (log.flushed_to_disk_lsn.load() >= lsn) { return (true); } if (log.should_stop_threads.load()) { if (!log_flusher_is_active()) { return (true); } } if (wait) { log_flush_notifier_mutex_exit(log); released = true; } LOG_SYNC_POINT("log_flush_notifier_before_wait"); return (false); }; const auto wait_stats = waiting.wait(stop_condition); MONITOR_INC_WAIT_STATS(MONITOR_LOG_FLUSH_NOTIFIER_, wait_stats); LOG_SYNC_POINT("log_flush_notifier_before_flushed_to_disk_lsn"); const lsn_t flush_lsn = log.flushed_to_disk_lsn.load(); const lsn_t notified_up_to_lsn = ut_uint64_align_up(flush_lsn, OS_FILE_LOG_BLOCK_SIZE); while (lsn <= notified_up_to_lsn) { const auto slot = (lsn - 1) / OS_FILE_LOG_BLOCK_SIZE & (log.flush_events_size - 1); lsn += OS_FILE_LOG_BLOCK_SIZE; LOG_SYNC_POINT("log_flush_notifier_before_notify"); os_event_set(log.flush_events[slot]); } lsn = flush_lsn + 1; if (step % 1024 == 0) { log_flush_notifier_mutex_exit(log); os_thread_sleep(0); log_flush_notifier_mutex_enter(log); } } log_flush_notifier_mutex_exit(log); } /* @} */ /**************************************************/ /** @name Log closer thread *******************************************************/ /* @{ */ void log_closer(log_t *log_ptr) { ut_a(log_ptr != nullptr); log_t &log = *log_ptr; lsn_t end_lsn = 0; log_closer_mutex_enter(log); Log_thread_waiting waiting{log, log.closer_event, srv_log_closer_spin_delay, srv_log_closer_timeout}; for (uint64_t step = 0;; ++step) { bool released = false; auto stop_condition = [&log, &released, step](bool wait) { if (released) { log_closer_mutex_enter(log); released = false; } /* Advance lsn up to which all the dirty pages have been added to flush lists. */ if (log_advance_dirty_pages_added_up_to_lsn(log)) { if (step % 1024 == 0) { log_closer_mutex_exit(log); os_thread_sleep(0); log_closer_mutex_enter(log); } return (true); } if (log.should_stop_threads.load()) { return (true); } if (wait) { log_closer_mutex_exit(log); released = true; } return (false); }; waiting.wait(stop_condition); /* Check if we should close the thread. */ if (log.should_stop_threads.load()) { if (!log_flusher_is_active() && !log_writer_is_active()) { end_lsn = log.write_lsn.load(); ut_a(log_lsn_validate(end_lsn)); ut_a(end_lsn == log.flushed_to_disk_lsn.load()); ut_a(end_lsn == log_buffer_ready_for_write_lsn(log)); ut_a(end_lsn >= log_buffer_dirty_pages_added_up_to_lsn(log)); if (log_buffer_dirty_pages_added_up_to_lsn(log) == end_lsn) { /* All confirmed reservations have been written to redo and all dirty pages related to those writes have been added to flush lists. However, there could be user threads, which are in the middle of log_buffer_reserve(), reserved range of sn values, but could not confirm. Note that because log_writer is already not alive, the only possible reason guaranteed by its death, is that there is x-lock at end_lsn, in which case end_lsn separates two regions in log buffer: completely full and completely empty. */ const lsn_t ready_lsn = log_buffer_ready_for_write_lsn(log); const lsn_t current_lsn = log_get_lsn(log); if (current_lsn > ready_lsn) { log.recent_written.validate_no_links(ready_lsn, current_lsn); log.recent_closed.validate_no_links(ready_lsn, current_lsn); } break; } /* We need to wait until remaining dirty pages have been added. */ } /* We prefer to wait until all writing is done. */ } } log_closer_mutex_exit(log); } /* @} */ /**************************************************/ /** @name Log files encryption *******************************************************/ /* @{ */ bool log_read_encryption() { space_id_t log_space_id = dict_sys_t::s_log_space_first_id; const page_id_t page_id(log_space_id, 0); byte *log_block_buf_ptr; byte *log_block_buf; byte key[ENCRYPTION_KEY_LEN]; byte iv[ENCRYPTION_KEY_LEN]; fil_space_t *space = fil_space_get(log_space_id); dberr_t err; log_block_buf_ptr = static_cast(ut_malloc_nokey(2 * OS_FILE_LOG_BLOCK_SIZE)); memset(log_block_buf_ptr, 0, 2 * OS_FILE_LOG_BLOCK_SIZE); log_block_buf = static_cast(ut_align(log_block_buf_ptr, OS_FILE_LOG_BLOCK_SIZE)); err = fil_redo_io(IORequestLogRead, page_id, univ_page_size, LOG_ENCRYPTION, OS_FILE_LOG_BLOCK_SIZE, log_block_buf); ut_a(err == DB_SUCCESS); if (memcmp(log_block_buf + LOG_HEADER_CREATOR_END, ENCRYPTION_KEY_MAGIC_V3, ENCRYPTION_MAGIC_SIZE) == 0) { /* Make sure the keyring is loaded. */ if (!Encryption::check_keyring()) { ut_free(log_block_buf_ptr); ib::error(ER_IB_MSG_1238) << "Redo log was encrypted," << " but keyring plugin is not loaded."; return (false); } if (Encryption::decode_encryption_info( key, iv, log_block_buf + LOG_HEADER_CREATOR_END, true)) { /* If redo log encryption is enabled, set the space flag. Otherwise, we just fill the encryption information to space object for decrypting old redo log blocks. */ fsp_flags_set_encryption(space->flags); err = fil_set_encryption(space->id, Encryption::AES, key, iv); if (err == DB_SUCCESS) { ut_free(log_block_buf_ptr); ib::info(ER_IB_MSG_1239) << "Read redo log encryption" << " metadata successful."; return (true); } else { ut_free(log_block_buf_ptr); ib::error(ER_IB_MSG_1240) << "Can't set redo log tablespace" << " encryption metadata."; return (false); } } else { ut_free(log_block_buf_ptr); ib::error(ER_IB_MSG_1241) << "Cannot read the encryption" " information in log file header, please" " check if keyring plugin loaded and" " the key file exists."; return (false); } } ut_free(log_block_buf_ptr); return (true); } bool log_file_header_fill_encryption(byte *buf, byte *key, byte *iv, bool is_boot, bool encrypt_key) { byte encryption_info[ENCRYPTION_INFO_SIZE]; if (!Encryption::fill_encryption_info(key, iv, encryption_info, is_boot, encrypt_key)) { return (false); } ut_a(LOG_HEADER_CREATOR_END + ENCRYPTION_INFO_SIZE < OS_FILE_LOG_BLOCK_SIZE); memcpy(buf + LOG_HEADER_CREATOR_END, encryption_info, ENCRYPTION_INFO_SIZE); return (true); } bool log_write_encryption(byte *key, byte *iv, bool is_boot) { const page_id_t page_id{dict_sys_t::s_log_space_first_id, 0}; byte *log_block_buf_ptr; byte *log_block_buf; log_block_buf_ptr = static_cast(ut_malloc_nokey(2 * OS_FILE_LOG_BLOCK_SIZE)); memset(log_block_buf_ptr, 0, 2 * OS_FILE_LOG_BLOCK_SIZE); log_block_buf = static_cast(ut_align(log_block_buf_ptr, OS_FILE_LOG_BLOCK_SIZE)); if (key == NULL && iv == NULL) { fil_space_t *space = fil_space_get(dict_sys_t::s_log_space_first_id); key = space->encryption_key; iv = space->encryption_iv; } if (!log_file_header_fill_encryption(log_block_buf, key, iv, is_boot, true)) { ut_free(log_block_buf_ptr); return (false); } auto err = fil_redo_io(IORequestLogWrite, page_id, univ_page_size, LOG_ENCRYPTION, OS_FILE_LOG_BLOCK_SIZE, log_block_buf); ut_a(err == DB_SUCCESS); ut_free(log_block_buf_ptr); return (true); } bool log_rotate_encryption() { fil_space_t *space = fil_space_get(dict_sys_t::s_log_space_first_id); if (!FSP_FLAGS_GET_ENCRYPTION(space->flags)) { return (true); } /* Rotate log tablespace */ return (log_write_encryption(nullptr, nullptr, false)); } void redo_rotate_default_master_key() { fil_space_t *space = fil_space_get(dict_sys_t::s_log_space_first_id); if (srv_shutdown_state.load() != SRV_SHUTDOWN_NONE) { return; } /* If the redo log space is using default key, rotate it. We also need the server_uuid initialized. */ if (space->encryption_type != Encryption::NONE && Encryption::s_master_key_id == ENCRYPTION_DEFAULT_MASTER_KEY_ID && !srv_read_only_mode && strlen(server_uuid) > 0) { ut_a(FSP_FLAGS_GET_ENCRYPTION(space->flags)); log_write_encryption(nullptr, nullptr, false); } } /* @} */ #endif /* !UNIV_HOTBACKUP */