polardbxengine/router/tests/helpers/router_test_helpers.cc

420 lines
12 KiB
C++

/*
Copyright (c) 2015, 2019, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is also distributed with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have included with MySQL.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
#include "router_test_helpers.h"
#include <cassert>
#include <cerrno>
#include <chrono>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <regex>
#include <stdexcept>
#include <thread>
#ifndef _WIN32
#include <sys/socket.h>
#include <unistd.h>
#else
#include <direct.h>
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#define getcwd _getcwd
typedef long ssize_t;
#endif
#include "keyring/keyring_manager.h"
#include "mysql/harness/filesystem.h"
#include "mysqlrouter/mysql_session.h"
#include "mysqlrouter/utils.h"
using mysql_harness::Path;
using namespace std::chrono_literals;
Path get_cmake_source_dir() {
Path result;
// PB2 specific source location
char *env_pb2workdir = std::getenv("PB2WORKDIR");
char *env_sourcename = std::getenv("SOURCENAME");
char *env_tmpdir = std::getenv("TMPDIR");
if ((env_pb2workdir && env_sourcename && env_tmpdir) &&
(strlen(env_pb2workdir) && strlen(env_tmpdir) &&
strlen(env_sourcename))) {
result = Path(env_tmpdir);
result.append(Path(env_sourcename));
if (result.exists()) {
return result;
}
}
char *env_value = std::getenv("CMAKE_SOURCE_DIR");
if (env_value == nullptr) {
// try a few places
result = Path(get_cwd()).join("..");
result = Path(result).real_path();
} else {
result = Path(env_value).real_path();
}
if (!result.join("src")
.join("router")
.join("src")
.join("router_app.cc")
.is_regular()) {
throw std::runtime_error(
"Source directory not available. Use CMAKE_SOURCE_DIR environment "
"variable; was " +
result.str());
}
return result;
}
Path get_envvar_path(const std::string &envvar, Path alternative = Path()) {
char *env_value = std::getenv(envvar.c_str());
Path result;
if (env_value == nullptr) {
result = alternative;
} else {
result = Path(env_value).real_path();
}
return result;
}
const std::string get_cwd() {
char buffer[FILENAME_MAX];
if (!getcwd(buffer, FILENAME_MAX)) {
throw std::runtime_error("getcwd failed: " + std::string(strerror(errno)));
}
return std::string(buffer);
}
const std::string change_cwd(std::string &dir) {
auto cwd = get_cwd();
#ifndef _WIN32
if (chdir(dir.c_str()) == -1) {
#else
if (!SetCurrentDirectory(dir.c_str())) {
#endif
throw std::runtime_error("chdir failed: " + mysqlrouter::get_last_error());
}
return cwd;
}
size_t read_bytes_with_timeout(int sockfd, void *buffer, size_t n_bytes,
uint64_t timeout_in_ms) {
// returns epoch time (aka unix time, etc), expressed in milliseconds
auto get_epoch_in_ms = []() -> uint64_t {
using namespace std::chrono;
time_point<system_clock> now = system_clock::now();
return static_cast<uint64_t>(
duration_cast<milliseconds>(now.time_since_epoch()).count());
};
// calculate deadline time
uint64_t now_in_ms = get_epoch_in_ms();
uint64_t deadline_epoch_in_ms = now_in_ms + timeout_in_ms;
// read until 1 of 3 things happen: enough bytes were read, we time out or
// read() fails
size_t bytes_read = 0;
while (true) {
#ifndef _WIN32
ssize_t res = read(sockfd, static_cast<char *>(buffer) + bytes_read,
n_bytes - bytes_read);
#else
WSASetLastError(0);
ssize_t res = recv(sockfd, static_cast<char *>(buffer) + bytes_read,
n_bytes - bytes_read, 0);
#endif
if (res == 0) { // reached EOF?
return bytes_read;
}
if (get_epoch_in_ms() > deadline_epoch_in_ms) {
throw std::runtime_error("read() timed out");
}
if (res == -1) {
#ifndef _WIN32
if (errno != EAGAIN) {
throw std::runtime_error(std::string("read() failed: ") +
strerror(errno));
}
#else
int err_code = WSAGetLastError();
if (err_code != 0) {
throw std::runtime_error("recv() failed with error: " +
get_last_error(err_code));
}
#endif
} else {
bytes_read += static_cast<size_t>(res);
if (bytes_read >= n_bytes) {
assert(bytes_read == n_bytes);
return bytes_read;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}
#ifdef _WIN32
std::string get_last_error(int err_code) {
char message[512];
FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
FORMAT_MESSAGE_ALLOCATE_BUFFER,
nullptr, err_code, LANG_NEUTRAL, message, sizeof(message),
nullptr);
return std::string(message);
}
#endif
void init_windows_sockets() {
#ifdef _WIN32
WSADATA wsaData;
int iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
if (iResult != 0) {
std::cerr << "WSAStartup() failed\n";
exit(1);
}
#endif
}
bool pattern_found(const std::string &s, const std::string &pattern) {
bool result = false;
try {
std::smatch m;
std::regex r(pattern);
result = std::regex_search(s, m, r);
} catch (const std::regex_error &e) {
std::cerr << ">" << e.what();
}
return result;
}
namespace {
#ifndef _WIN32
int close_socket(int sock) {
::shutdown(sock, SHUT_RDWR);
return close(sock);
}
#else
int close_socket(SOCKET sock) {
::shutdown(sock, SD_BOTH);
return closesocket(sock);
}
#endif
} // namespace
bool wait_for_port_ready(uint16_t port, std::chrono::milliseconds timeout,
const std::string &hostname) {
struct addrinfo hints, *ainfo;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE;
// Valgrind needs way more time
if (getenv("WITH_VALGRIND")) {
timeout *= 10;
}
int status = getaddrinfo(hostname.c_str(), std::to_string(port).c_str(),
&hints, &ainfo);
if (status != 0) {
throw std::runtime_error(
std::string("wait_for_port_ready(): getaddrinfo() failed: ") +
gai_strerror(status));
}
std::shared_ptr<void> exit_freeaddrinfo(nullptr,
[&](void *) { freeaddrinfo(ainfo); });
const auto MSEC_STEP = 10ms;
const auto started = std::chrono::steady_clock::now();
do {
auto sock_id =
socket(ainfo->ai_family, ainfo->ai_socktype, ainfo->ai_protocol);
if (sock_id < 0) {
throw std::runtime_error("wait_for_port_ready(): socket() failed: " +
std::to_string(mysqlrouter::get_socket_errno()));
}
std::shared_ptr<void> exit_close_socket(
nullptr, [&](void *) { close_socket(sock_id); });
status = connect(sock_id, ainfo->ai_addr, ainfo->ai_addrlen);
if (status < 0) {
const auto step = std::min(timeout, MSEC_STEP);
std::this_thread::sleep_for(std::chrono::milliseconds(step));
timeout -= step;
}
} while (status < 0 && timeout > std::chrono::steady_clock::now() - started);
return status >= 0;
}
void init_keyring(std::map<std::string, std::string> &default_section,
const std::string &keyring_dir,
const std::string &user /*= "mysql_router1_user"*/,
const std::string &password /*= "root"*/) {
// init keyring
const std::string masterkey_file = Path(keyring_dir).join("master.key").str();
const std::string keyring_file = Path(keyring_dir).join("keyring").str();
mysql_harness::init_keyring(keyring_file, masterkey_file, true);
mysql_harness::Keyring *keyring = mysql_harness::get_keyring();
keyring->store(user, "password", password);
mysql_harness::flush_keyring();
mysql_harness::reset_keyring();
// add relevant config settings to [DEFAULT] section
default_section["keyring_path"] = keyring_file;
default_section["master_key_path"] = masterkey_file;
}
namespace {
bool real_find_in_file(
const std::string &file_path,
const std::function<bool(const std::string &)> &predicate,
std::ifstream &in_file, std::ios::streampos &cur_pos) {
if (!in_file.is_open()) {
in_file.clear();
Path file(file_path);
in_file.open(file.c_str(), std::ifstream::in);
if (!in_file) {
throw std::runtime_error("Error opening file " + file.str());
}
cur_pos = in_file.tellg(); // initialize properly
} else {
// set current position to the end of what was already read
in_file.clear();
in_file.seekg(cur_pos);
}
std::string line;
while (std::getline(in_file, line)) {
cur_pos = in_file.tellg();
if (predicate(line)) {
return true;
}
}
return false;
}
} // namespace
bool find_in_file(const std::string &file_path,
const std::function<bool(const std::string &)> &predicate,
std::chrono::milliseconds sleep_time) {
const auto STEP = std::chrono::milliseconds(100);
std::ifstream in_file;
std::ios::streampos cur_pos;
do {
try {
// This is proxy function to account for the fact that I/O can sometimes
// be slow.
if (real_find_in_file(file_path, predicate, in_file, cur_pos))
return true;
} catch (const std::runtime_error &) {
// report I/O error only on the last attempt
if (sleep_time == std::chrono::milliseconds(0)) {
std::cerr << " find_in_file() failed, giving up." << std::endl;
throw;
}
}
const auto sleep_for = std::min(STEP, sleep_time);
std::this_thread::sleep_for(sleep_for);
sleep_time -= sleep_for;
} while (sleep_time > std::chrono::milliseconds(0));
return false;
}
std::string get_file_output(const std::string &file_name,
const std::string &file_path) {
return get_file_output(file_path + "/" + file_name);
}
std::string get_file_output(const std::string &file_name) {
Path file(file_name);
std::ifstream in_file;
in_file.open(file.c_str(), std::ifstream::in);
if (!in_file) {
return "Could not open file " + file.str() + " for reading.";
}
std::string result((std::istreambuf_iterator<char>(in_file)),
std::istreambuf_iterator<char>());
return result;
}
void connect_client_and_query_port(unsigned router_port, std::string &out_port,
bool should_fail) {
using mysqlrouter::MySQLSession;
MySQLSession client;
if (should_fail) {
try {
client.connect("127.0.0.1", router_port, "username", "password", "", "");
} catch (const std::exception &exc) {
if (std::string(exc.what()).find("Error connecting to MySQL server") !=
std::string::npos) {
out_port = "";
return;
} else
throw;
}
throw std::runtime_error(
"connect_client_and_query_port: did not fail as expected");
} else {
client.connect("127.0.0.1", router_port, "username", "password", "", "");
}
std::unique_ptr<MySQLSession::ResultRow> result{
client.query_one("select @@port")};
if (nullptr == result.get()) {
throw std::runtime_error(
"connect_client_and_query_port: error querying the port");
}
if (1u != result->size()) {
throw std::runtime_error(
"connect_client_and_query_port: wrong number of columns returned " +
std::to_string(result->size()));
}
out_port = std::string((*result)[0]);
}