Feed Discrepancy Research  – Background – Digging in



Feed Discrepancy Research  – Background – Digging in

0 0


tech_talk_presentation


On Github kerinin / tech_talk_presentation

Feed Discrepancy Research 

http://kerinin.github.io/tech_talk_presentation

Background

The OIB Data Pipeline

(some of) Our Stack

Daily Feed

'Read Rate' workers scan all Subscriber accounts, fetch messages received in the past 3 days. Each message converted into a line containing timestamps, subject, sending domain, etc.  Data pushed to Amazon Daily Pig scripts rolls up & anonymizes this data, sends to CO

Hourly Feed

'Sync' workers continuously scan accounts, fetch messages which haven't been seen yet (based on message-id) Each message is parsed for Campaign ID's, messages from 'watched' domains are parsed for subject lines Counters in Redis are incremented for each found message Redis counts are sent to CO daily

Code Paths, Account -> CO

Each of the following involves its own codebase, with minimal shared code:
  • Hourly Sync (Yahoo)
  • Hourly Sync (Gmail)
  • Hourly Sync (IMAP)
  • Daily Scan (Yahoo)
  • Daily Scan (IMAP)
  • Pig Anonymization (Yahoo)
  • Pig Anonymization (IMAP)

Because history

Sample Data

Hourly:
Daily (truncated):

The Problem

Daily Aggregate values are different for Daily & Hourly feeds
Discrepancies seem to occur across providers, folders, and senders
WTF?

Digging in

Debugging / Research process

1. Is this *really* my problem?

  • Definitely my problem

2. Are there any obvious problems in the codebase?

(10k lines of code omitted for clarity)
  • Yahoo Daily scan uses offset-based iteration to fetch messages
  • No smoking guns

3. Is the problem specific to some subset of Campaign ID's?

  • Not based on the axes available
  • But the distribution isn't the same for Gmail & Yahoo

4. Are we dropping messages somewhere in our infrastructure?

Log messages received by the raw queries Log messages once they're re-formatted for export Log messages after they're transferred between processes Count messages exiting our pipeline

  • We lose less than 1% of messages in both feeds*

*assuming the process doesn't fail before we can log the message counts

5. Is the discrepancy being caused by the CID/SLS parsing code?

Translate Hourly message-level data structures into equivalent Daily message-level data structures Push the synthetic Daily data to its own S3 bucket daily Compare resulting data for equivalent time windows

(cont)

 
  • The discrepancy exists across all data partitions considered
  • The SLS/CID processes can't be the only cause

6. Where's happening to individual messages?

7. Are there obvious patterns in the missing messages?

  • Gmail 'All Mail' not being scanned
  • No smoking gun

8. What do the dates we're seeing actually refer to?

Date message received:

  • Yahoo MessageArray["received_date"]
  • Imap INTERNALDATE

Date Header:

  • Yahoo Inboxservices["d_t"]

Both providers timestamp in seconds since the epoch, UTC

  • Our 'SINCE' queries should be fetching the correct time window

9. Do we get the same results if we run the same code concurrently?

  • Around .1% discrepancy

10. Do we get the same results if we run the same code after 24h?

(looking at a shared time window)?
4.87% of messages seen on 4/10's Daily scan were not seen on 4/11's Daily scan
  • Intra-day results are far more stable than Daily/Hourly results

11. Are these messages *actually* disappearing?

Original Corpus: 109,802 messages Missing from Original: 37,428 messages Found on remote server: 1,258 messages 'Found' messages present in subsequent scan: 194

  • Subsequent scans include (some of) the missing messages
  • Message dropping (sometimes) has nothing to do with the message itself

13. Are we dropping messages due to error handling failures?

  • 'Missing' rate drops from 75.8% for all users to 47.9% for successfully scanned users

14. How much of the source data falls outside the expected time window?

  • 15% of it.  Shit.

Results*

*as of right now

Gmail Daily Scans don't check 'All Mail'

If a user clicks 'Archive' on an email with no other tags, we will not fetch that message during the daily scans
  • Re-implement Gmail scan to fetch messages from 'All Mail', 'Trash' and 'Spam'
  • Use X-GM-LABELS to infer mailbox memberships

Yahoo Daily Scan iterates backwards through message history using an offset counter

If a message arrives during our scan, the offset will have an off-by-one error.
If a message is moved/deleted during our scan, the same.
  • Fetch the list of all message identifiers first, then iterate through them fetching the full body & headers.

Error Handling

A disproportionate amount of 'missing' messages were seen in accounts that experienced errors, indicating that we're probably receiving partial datasets from these accounts
  • Try to buffer data during scans and flush the buffer at the end of the scan

15% of Hourly messages are from the past

These messages will be incorrectly aggregated with "today's" data by the Hourly scan, but not by the Daily scans
  • Filter Hourly data to 'today' for CID & SLS feeds

Lessons Learned

Junk In: Junk Out

Take the time to build the data source you wish you had

Data ≠ Information

Build tools to make large data sources useful
Operate on the full data as much as possible - edge cases are everywhere

Posix Tools are your Friends

They're fast, flexible, and are often a better fit than 'heavier' solutions
(I'm looking at you, Pig)

A graph is worth a thousand statistics

Aggregates are great for quantitative comparisons and quick peeks, but they only go so far

Tool Tour

Some goodies worth sharing

Posix Tricks

Globbing
cat *_foo/bar

cut

cut -f1-3   # => extracts fields 1-3.
cut -f5-    # => extracts all fields after 4
cut -c10-20 # => extracts characters 10 through 20

sort

sort -t$'\t' -k10    # => sort by the 10th field, with tab-delimited fields
sort -k3n -k10       # => primary sort on 3rd field as numeric, secondary on 10
sort -n              # => reverse sort

uniq

sort | uniq        # => returns unique elements (must be sorted first)
sort | uniq -c     # => returns unique elements with counts

Posix Tricks cont.

fgrep
fgrep '/foobar/'      # => treats '/foobar/' as a string, not a regex (faster)
awk, sed
awk 'BEGIN {FS="\t"}; { if( $3 % 10 == 2) print $5 / $4}'
sed 's/[[:space:]]+/\ /g'
Subcommands & File Descriptors
vimdiff <(sort -k5 foo | cut -f3-) <(sort -k5 bar | cut -f3-)
cut -f11 foo | vim -
echo "$(wc -l foo) / $(wc -l bar)" | bc

Posix Tricks cont.

Some of my convenience functions
function summation() { paste -sd+ - | bc }
echo "1 2 3" | summation  # => 6

function compact() { sed '/^$/d' }
echo "1\n\n2\n3" | compact # => '1\n2\n3'

function quantiles()
{
  R --vanilla --slave -e "options(width = 400)
  data=scan(pipe('cat /dev/stdin')); quantile(data, seq(0,1,0.1));"
}
echo "1 2 3 4 5" | summation # => (10 quantiles)

function ecdf()
{
  R --vanilla --slave -e "options(width = 400)
  data=scan(pipe('cat /dev/stdin')); ecdf(data)($1);"
}
echo "1 2 3 4 5" | ecdf 3  # => 0.6

Pig Tricks

HashFNV
REGISTER s3://elasticmapreduce/libs/pig/0.9.1/piggybank-0.9.1-amzn.jar;
DEFINE HASH org.apache.pig.piggybank.evaluation.string.HashFNV();

with_guid = FOREACH records GENERATE
  CONCAT((chararray)HASH(subject), CONCAT(user_id, from)) as guid, 
  scanned_at..;

sampled = FILTER with_guid BY HASH(guid, 1000) == 0;
%declare
%declare EMR_HOST `uname -n`;
Metaprogramming
%declare INPUT_SCHEMA_C `echo '$INPUT_SCHEMA' | tr '~' ','`;
%declare FIELD_COUNT `echo '$INPUT_SCHEMA_C' | awk -F ',' '{ print NF }'`;
%declare INDEX_END `echo "$(( $FIELD_COUNT-1 ))"`;

data = LOAD '$INPUT_PATH' AS ($INPUT_SCHEMA_C);
less_data = FOREACH data GENERATE $4..$$INDEX_END;

Pig Tricks cont.

Sample By Hash
%declare BASE_INPUT_C `echo "$BASE_INTPUT" | tr '~' ','`;
%declare OTHER_INPUT_C `echo "$OTHER_INTPUT" | tr '~' ','`;

REGISTER s3://elasticmapreduce/libs/pig/0.9.1/piggybank-0.9.1-amzn.jar;
DEFINE HASH org.apache.pig.piggybank.evaluation.string.HashFNV();

input_b = LOAD '$BASE_INPUT_C' USING TextLoader() AS line;
input_o = LOAD '$OTHER_INPUT_C' USING TextLoader() AS line;

filtered_b = FILTER input_b BY 
  HASH(SUBSTRING(line, 0, $FIRST_N), 
  $SAMPLE_DENOMINATOR) == 0;
filtered_o = FILTER input_o BY 
  HASH(SUBSTRING(line, 0, $FIRST_N), 
  $SAMPLE_DENOMINATOR) == 0;

ordered_b = ORDER filtered_b BY *;
ordered_o = ORDER filtered_o BY *;

STORE ordered_b INTO '$BASE_OUTPUT';
STORE ordered_o INTO '$OTHER_OUTPUT';

Pig Tricks cont.

Diff Records
b =             LOAD '$BASE_C' AS ($INPUT_SCHEMA_C);

b_guid_fail =   FILTER b BY $0 IS NULL;
b_guid_fail =   FOREACH b_guid_fail GENERATE $1..;

joined =        COGROUP b BY (chararray)$0, o BY (chararray)$0;
only_b =        FILTER joined BY IsEmpty(o);
only_b =        FOREACH only_b GENERATE FLATTEN(b);
only_b =        FOREACH only_b GENERATE $1..;

both =          FILTER joined BY (NOT IsEmpty(b)) AND (NOT IsEmpty(o));
both_singular = FILTER both BY (SIZE(b) == 1) AND (SIZE(o) == 1);
both_multiple = FILTER both BY (SIZE(b) != 1) OR (SIZE(o) != 1);
both_flattened = FOREACH both_singular GENERATE FLATTEN(b), FLATTEN(o);
both_modified_raw = FILTER both_flattened BY $MODIFIED_QUERY_C;
both_unmodified_raw = FILTER both_flattened BY $UNMODIFIED_QUERY_C;
both_unmodified =   FOREACH both_unmodified_raw GENERATE $1..$$BASE_END;

both_from_b =       FOREACH both_modified_raw GENERATE $1..$$BASE_END;
both_modified_b =   FOREACH both_multiple GENERATE FLATTEN(b);
both_modified_b =   FOREACH both_modified_b GENERATE $1..; 

R Tricks

plot()
plot( log(data$x), log(data$y), ann=F, xlim=c(0,1), ylim=c(0,1), col='blue' )
plot( log(test$x), log(test$y), add=T, col='red' )
hist()
hist( data$x, xlim=c(0,10), breaks=quantiles(data$x, seq(0,1,0.1)) )
plot.ecdf()
plot.ecdf( data$x, xlim=c(0,10) )
abline()
abline(v=10)
Diagonal line
lines( par()$usr[1:2], par()$usr[3:4] )

Wishlist

aka, "Things that would have made this less of an ordeal"
  • Data pipeline structured as small, discrete operations
  • Ability to easily divert duplicate output data to new operations from any point within the pipeline
  • Capture ALL available data at the top of the data funnel
  • Staging environment for changes to the pipeline
  • Single, canonical data source

Fin.

Me

This Presentation

The codez