Enabling Fast Data using in memory-centric computing with Tachyon
Fast Data: What And Why
- WhatThe evolution of Big Data (Volume, Velocity, Variety, Veracity) with focus on streaming technologies capable of delivering, processing, analyzing and storing events as fast as they come in. In Real Time.
- WhyFast data gives organizations the ability to test, experiment and fail forward faster, by cycling through
these tests at a higher speed, in order to learn and succeed quicker
FAST DATA NON FUNCTIONAL REQUIREMENTS
- DATA SOURCE AND DATA SYNC CONNECTORS Read, Write and Combine multiple heterogeneous data sources or data destination via a common interface.
- BATCH AND PIPELINE PROCESSIONG Non continuous (non-realtime) processing of massive amount of data from start to completition
- REAL TIME STREAMING Process AND Aggregate real time events, data streams or sensor data
- OLAP PROCESSING Support for analytical/information processing involving complext queries and aggregations
- IN-MEMORY PROCESSING
Use memory for speed, staging and sharing data across jobs
- DISTRIBUTED INTENSIVE I/O Support for fast, intensive and scalable read and writes
Why IN MEMORY PROCESSING
- Pros
- Cost / capacity of Memory is lower and lower, which makes it possible to handle huge size of data in memory
- Throughput of RAM is increasing exponentially
- Many computation frameworks (e.g. Spark and Flink) leverage memory
- Cons
- GC
- Data sharing across frameworks
IN MEMORY PROCESSING: throughput
IN MEMORY PROCESSING: Cost
IN MEMORY PROCESSING: Known Problems
-
Garbage Collector:
dealing with heap brings also Garbace collector which can slow jobs or even cause failures
-
Data sharing across frameworks:
objects in heap must be serialized and deserialized somehow in order to be shared across frameworks
-
Fault Tolerance (replication ?)
-
What if data does not fit memory size?
Tachyon
“Tachyon is a memory-centric distributed storage system
enabling reliable data sharing at memory-speed across cluster frameworks.”
AmpLab BDAS
“BDAS, the Berkeley Data Analytics Stack, is an open source software stack that integrates software
components being built by the AMPLab to make sense of Big Data.”
Zookeeper
- Zookeeper is a frameword used to achieve fault tolerance; a leader node is elected every time cluster is started up.
Master
- Manage the global metadata of the system
- Clients endpoint for reading or modifying this metadata.
- all workers periodically heartbeat to the master
Worker
- manage local resources allocated to Tachyon.
- store data as blocks and serve requests from clients to read or write data by reading or creating new blocks.
Client
- provides users a gateway to interact with the Tachyon servers.
- exposes a file system API (Java).
- It initiates communication with master for metadata operations
and with workers to read and write data.
Storage types
- Tachyon Storage
- multi layer (tiered storage) for storing user files
- may be volatile
- accessible by schema tachyon://[path]
- Underfs storage: 3rd party file system used for persisting tachyon journal, lineage and eventually for user files sync
underfs
Tachyon relys on some other storage system to safely store metadata and user data (if configured)Supported file system:
- Local
- HDFS
- S3
- Swift
- GlusterFS
- your own implementation:extends tachyon.underfs.UnderFileSystem
write types
- MUST_CACHE: Write the file to Tachyon storage or failing the operation.
- CACHE THROUGH: Write the file synchronously to the under fs,
and also try to write Tachyon storage.
- THROUGH: Write the file synchronously to the under fs, skipping Tachyon storage.
- ASYNC_THROUGH: Write the file asynchronously to the under fs.
Tiered Storage
- Tachyon manages other storage types in addition to memory,
since its capacity may be limited in some deployments.
- Tachyon automatically manages blocks between all the configured tiers
Tiered Storage
Tiers are fully configurable; here a typical scenario
Configuring tiered storage
tachyon.worker.tieredstore.level.max
tachyon.worker.tieredstore.level{x}.alias
tachyon.worker.tieredstore.level{x}.dirs.quota
tachyon.worker.tieredstore.level{x}.dirs.path
tachyon.worker.tieredstore.level{x}.reserved.ratio
tachyon.worker.tieredstore.level.max=2
tachyon.worker.tieredstore.level0.alias=MEM
tachyon.worker.tieredstore.level0.dirs.path=/mnt/ramdisk
tachyon.worker.tieredstore.level0.dirs.quota=100GB
tachyon.worker.tieredstore.level0.reserved.ratio=0.2
tachyon.worker.tieredstore.level1.alias=HDD
tachyon.worker.tieredstore.level1.dirs.path=/mnt/hdd1,/mnt/hdd2,/mnt/hdd3
tachyon.worker.tieredstore.level1.dirs.quota=2TB,5TB,500GB
tachyon.worker.tieredstore.level1.reserved.ratio=0.1
Writing Data
When a user writes a new block, it is written to the top tier by default (a custom allocator can be used if the default behavior is not desired). If there is not enough space for the block in the top tier, then the evictor is triggered in order to free space for the new block.
Reading Data
Tachyon will simply read the block from where it is already stored. If Tachyon is configured with multiple tiers, then the block will not be necessarily read from the top tier, since it could have been moved to a lower tier transparently.
Block Promotion
Reading data with TachyonStorageType.PROMOTE configuration will ensure the data is first transferred to the top tier before it is read from the worker.This can also be used as a data management strategy by explicitly moving hot data to higher tiers.
Allocators
Tachyon uses allocators for choosing locations for writing new blocks. Tachyon has a framework for customized allocators, but there are a few default implementations of allocators. Here are the existing allocators in Tachyon:
Evictors
Tachyon uses evictors for deciding which blocks to move to a lower tier, when space needs to be freed. Tachyon supports custom evictors, and implementations include:
-
GreedyEvictor: Evicts arbitrary blocks until the required size is freed.
-
LRUEvictor: Evicts the least-recently-used blocks until the required size is freed.
-
LRFUEvictor: Evicts blocks based on least-recently-used and least-frequently-used with a configurable weight. If the weight is completely biased toward least-recently-used, the behavior will be the same as the LRUEvictor.
-
Your own implementation implements tachyon.worker.block.evictor
tachyon.worker.evictor.class=tachyon.worker.block.evictor.LRUEvictor
- PartialLRUEvictor: Evicts based on least-recently-used but will choose StorageDir with maximum free space and only evict from that StorageDir.
Lineage
Tachyon can achieve high throughput writes and reads, without compromising fault-tolerance by using Lineage
- lost output is recovered by re-executing the jobs that created the output.
- applications write output into memory, and Tachyon periodically checkpoints the output into the under file system in an asynchronous way.
In case of failures, Tachyon launches job recomputation to restore the lost files.
Lineage assumes that jobs are deterministic.
Lineage API (alpha)
TachyonLineage tl = TachyonLineage.get();
// input file paths
TachyonURI input1 = new TachyonURI("/inputFile1");
TachyonURI input2 = new TachyonURI("/inputFile2");
List inputFiles = Lists.newArrayList(input1, input2);
// output file paths
TachyonURI output = new TachyonURI("/outputFile");
List outputFiles = Lists.newArrayList(output);
// command-line job
JobConf conf = new JobConf("/tmp/recompute.log");
CommandLineJob job = new CommandLineJob("my-spark-job.sh", conf);
long lineageId = tl.createLineage(inputFiles, outputFiles, job);
Transparent Naming
Transparent naming maintains an identity between the Tachyon namespace and the underlying storage system namespace.
Transparent naming
-
object paths are replicated in under fs.
-
Files in the underfs are transparently discovered even if they were not created through Tachyontheir metadata will be loaded in memory the first time they are accessed
Unified Namespace
- Tachyon provides a mounting API that makes it possible to use Tachyon to access data across multiple data sources.
bool mount(String tachyonPath, String ufsPath);
bool unmount(String tachyonPath);
Tachyon shell
- provides basic file system operations (ls, rm, cat, mv, tail, touch etc.)
- provides "special" operation
- persist: persists data from Tachyon storage to the under fs
- pin / unpin: marks / unmarsk a file or folder as pinned in Tachyon (i.e. no eviction will be applied)
- load: load data from under storage into Tachyon storage
- report: marks a file as lost to the Tachyon master. Marking a file as lost will cause the
master to schedule a recomputation job to regenerate the file.
Remote Write support
$ java -cp
SW/tachyon-0.8.2/assembly/target/tachyon-assemblies-0.8.2-jar-with-dependencies.jar
-Dtachyon.master.hostname=backpressure-master
tachyon.shell.TfsShell copyFromLocal icla.pdf /
ex. spark memory management
Execution engine
Storage engine
- Temporary data and block manager are in the same spark job
- GC involved in both of them
- Share data between jobs using file system
ex. spark off_heap storage
Execution engine
Storage engine
- storage engine stores data in a different process
- GC involved only in execution engine
- Share data between jobs using Tachyon at memory speed
ex. spark off_heap storage
def persist(newLevel: StorageLevel): this.type
class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
testRdd.persist(StorageLevel.OFF_HEAP)
Enabling Fast Data using in memory-centric computing with Tachyon
by Saverio Veltri / @save_veltri