/* Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License, version 2.0, as published by the Free Software Foundation. This program is also distributed with certain software (including but not limited to OpenSSL) that is licensed under separate terms, as designated in a particular file or component or in included license documentation. The authors of MySQL hereby grant you an additional permission to link the program and your derivative works with the separately licensed software that they have included with MySQL. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include "router_test_helpers.h" #include #include #include #include #include #include #include #include #include #ifndef _WIN32 #include #include #else #include #include #include #include #define getcwd _getcwd typedef long ssize_t; #endif #include "keyring/keyring_manager.h" #include "mysql/harness/filesystem.h" #include "mysqlrouter/mysql_session.h" #include "mysqlrouter/utils.h" using mysql_harness::Path; using namespace std::chrono_literals; Path get_cmake_source_dir() { Path result; // PB2 specific source location char *env_pb2workdir = std::getenv("PB2WORKDIR"); char *env_sourcename = std::getenv("SOURCENAME"); char *env_tmpdir = std::getenv("TMPDIR"); if ((env_pb2workdir && env_sourcename && env_tmpdir) && (strlen(env_pb2workdir) && strlen(env_tmpdir) && strlen(env_sourcename))) { result = Path(env_tmpdir); result.append(Path(env_sourcename)); if (result.exists()) { return result; } } char *env_value = std::getenv("CMAKE_SOURCE_DIR"); if (env_value == nullptr) { // try a few places result = Path(get_cwd()).join(".."); result = Path(result).real_path(); } else { result = Path(env_value).real_path(); } if (!result.join("src") .join("router") .join("src") .join("router_app.cc") .is_regular()) { throw std::runtime_error( "Source directory not available. Use CMAKE_SOURCE_DIR environment " "variable; was " + result.str()); } return result; } Path get_envvar_path(const std::string &envvar, Path alternative = Path()) { char *env_value = std::getenv(envvar.c_str()); Path result; if (env_value == nullptr) { result = alternative; } else { result = Path(env_value).real_path(); } return result; } const std::string get_cwd() { char buffer[FILENAME_MAX]; if (!getcwd(buffer, FILENAME_MAX)) { throw std::runtime_error("getcwd failed: " + std::string(strerror(errno))); } return std::string(buffer); } const std::string change_cwd(std::string &dir) { auto cwd = get_cwd(); #ifndef _WIN32 if (chdir(dir.c_str()) == -1) { #else if (!SetCurrentDirectory(dir.c_str())) { #endif throw std::runtime_error("chdir failed: " + mysqlrouter::get_last_error()); } return cwd; } size_t read_bytes_with_timeout(int sockfd, void *buffer, size_t n_bytes, uint64_t timeout_in_ms) { // returns epoch time (aka unix time, etc), expressed in milliseconds auto get_epoch_in_ms = []() -> uint64_t { using namespace std::chrono; time_point now = system_clock::now(); return static_cast( duration_cast(now.time_since_epoch()).count()); }; // calculate deadline time uint64_t now_in_ms = get_epoch_in_ms(); uint64_t deadline_epoch_in_ms = now_in_ms + timeout_in_ms; // read until 1 of 3 things happen: enough bytes were read, we time out or // read() fails size_t bytes_read = 0; while (true) { #ifndef _WIN32 ssize_t res = read(sockfd, static_cast(buffer) + bytes_read, n_bytes - bytes_read); #else WSASetLastError(0); ssize_t res = recv(sockfd, static_cast(buffer) + bytes_read, n_bytes - bytes_read, 0); #endif if (res == 0) { // reached EOF? return bytes_read; } if (get_epoch_in_ms() > deadline_epoch_in_ms) { throw std::runtime_error("read() timed out"); } if (res == -1) { #ifndef _WIN32 if (errno != EAGAIN) { throw std::runtime_error(std::string("read() failed: ") + strerror(errno)); } #else int err_code = WSAGetLastError(); if (err_code != 0) { throw std::runtime_error("recv() failed with error: " + get_last_error(err_code)); } #endif } else { bytes_read += static_cast(res); if (bytes_read >= n_bytes) { assert(bytes_read == n_bytes); return bytes_read; } } std::this_thread::sleep_for(std::chrono::milliseconds(10)); } } #ifdef _WIN32 std::string get_last_error(int err_code) { char message[512]; FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS | FORMAT_MESSAGE_ALLOCATE_BUFFER, nullptr, err_code, LANG_NEUTRAL, message, sizeof(message), nullptr); return std::string(message); } #endif void init_windows_sockets() { #ifdef _WIN32 WSADATA wsaData; int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); if (iResult != 0) { std::cerr << "WSAStartup() failed\n"; exit(1); } #endif } bool pattern_found(const std::string &s, const std::string &pattern) { bool result = false; try { std::smatch m; std::regex r(pattern); result = std::regex_search(s, m, r); } catch (const std::regex_error &e) { std::cerr << ">" << e.what(); } return result; } namespace { #ifndef _WIN32 int close_socket(int sock) { ::shutdown(sock, SHUT_RDWR); return close(sock); } #else int close_socket(SOCKET sock) { ::shutdown(sock, SD_BOTH); return closesocket(sock); } #endif } // namespace bool wait_for_port_ready(uint16_t port, std::chrono::milliseconds timeout, const std::string &hostname) { struct addrinfo hints, *ainfo; memset(&hints, 0, sizeof hints); hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; hints.ai_flags = AI_PASSIVE; // Valgrind needs way more time if (getenv("WITH_VALGRIND")) { timeout *= 10; } int status = getaddrinfo(hostname.c_str(), std::to_string(port).c_str(), &hints, &ainfo); if (status != 0) { throw std::runtime_error( std::string("wait_for_port_ready(): getaddrinfo() failed: ") + gai_strerror(status)); } std::shared_ptr exit_freeaddrinfo(nullptr, [&](void *) { freeaddrinfo(ainfo); }); const auto MSEC_STEP = 10ms; const auto started = std::chrono::steady_clock::now(); do { auto sock_id = socket(ainfo->ai_family, ainfo->ai_socktype, ainfo->ai_protocol); if (sock_id < 0) { throw std::runtime_error("wait_for_port_ready(): socket() failed: " + std::to_string(mysqlrouter::get_socket_errno())); } std::shared_ptr exit_close_socket( nullptr, [&](void *) { close_socket(sock_id); }); status = connect(sock_id, ainfo->ai_addr, ainfo->ai_addrlen); if (status < 0) { const auto step = std::min(timeout, MSEC_STEP); std::this_thread::sleep_for(std::chrono::milliseconds(step)); timeout -= step; } } while (status < 0 && timeout > std::chrono::steady_clock::now() - started); return status >= 0; } void init_keyring(std::map &default_section, const std::string &keyring_dir, const std::string &user /*= "mysql_router1_user"*/, const std::string &password /*= "root"*/) { // init keyring const std::string masterkey_file = Path(keyring_dir).join("master.key").str(); const std::string keyring_file = Path(keyring_dir).join("keyring").str(); mysql_harness::init_keyring(keyring_file, masterkey_file, true); mysql_harness::Keyring *keyring = mysql_harness::get_keyring(); keyring->store(user, "password", password); mysql_harness::flush_keyring(); mysql_harness::reset_keyring(); // add relevant config settings to [DEFAULT] section default_section["keyring_path"] = keyring_file; default_section["master_key_path"] = masterkey_file; } namespace { bool real_find_in_file( const std::string &file_path, const std::function &predicate, std::ifstream &in_file, std::ios::streampos &cur_pos) { if (!in_file.is_open()) { in_file.clear(); Path file(file_path); in_file.open(file.c_str(), std::ifstream::in); if (!in_file) { throw std::runtime_error("Error opening file " + file.str()); } cur_pos = in_file.tellg(); // initialize properly } else { // set current position to the end of what was already read in_file.clear(); in_file.seekg(cur_pos); } std::string line; while (std::getline(in_file, line)) { cur_pos = in_file.tellg(); if (predicate(line)) { return true; } } return false; } } // namespace bool find_in_file(const std::string &file_path, const std::function &predicate, std::chrono::milliseconds sleep_time) { const auto STEP = std::chrono::milliseconds(100); std::ifstream in_file; std::ios::streampos cur_pos; do { try { // This is proxy function to account for the fact that I/O can sometimes // be slow. if (real_find_in_file(file_path, predicate, in_file, cur_pos)) return true; } catch (const std::runtime_error &) { // report I/O error only on the last attempt if (sleep_time == std::chrono::milliseconds(0)) { std::cerr << " find_in_file() failed, giving up." << std::endl; throw; } } const auto sleep_for = std::min(STEP, sleep_time); std::this_thread::sleep_for(sleep_for); sleep_time -= sleep_for; } while (sleep_time > std::chrono::milliseconds(0)); return false; } std::string get_file_output(const std::string &file_name, const std::string &file_path) { return get_file_output(file_path + "/" + file_name); } std::string get_file_output(const std::string &file_name) { Path file(file_name); std::ifstream in_file; in_file.open(file.c_str(), std::ifstream::in); if (!in_file) { return "Could not open file " + file.str() + " for reading."; } std::string result((std::istreambuf_iterator(in_file)), std::istreambuf_iterator()); return result; } void connect_client_and_query_port(unsigned router_port, std::string &out_port, bool should_fail) { using mysqlrouter::MySQLSession; MySQLSession client; if (should_fail) { try { client.connect("127.0.0.1", router_port, "username", "password", "", ""); } catch (const std::exception &exc) { if (std::string(exc.what()).find("Error connecting to MySQL server") != std::string::npos) { out_port = ""; return; } else throw; } throw std::runtime_error( "connect_client_and_query_port: did not fail as expected"); } else { client.connect("127.0.0.1", router_port, "username", "password", "", ""); } std::unique_ptr result{ client.query_one("select @@port")}; if (nullptr == result.get()) { throw std::runtime_error( "connect_client_and_query_port: error querying the port"); } if (1u != result->size()) { throw std::runtime_error( "connect_client_and_query_port: wrong number of columns returned " + std::to_string(result->size())); } out_port = std::string((*result)[0]); }